-1

I'm trying to get to run two parallel asynchronous processes in a loop, one main and one via COPROC, in order to be able to collect and synchronize results, when both have finished (for each loop), to speed up large-test run times.

Cygwin Bash version is 5.2.21(1)-release, parallel tool isn't available, so I'm trying to use COPROC.

The resources found on the web are unclear, whether or not multiple independent COPROC instances should be possible from version >5.0 (despite warnings about process still exists) - here, it didn't seem to work and to be on the safe side, I've restricted my code to trying to run just one co-process in parallel.

In the following example, an array of values is set up and filtered in two different ways in the function loopedElapsedTimeCalculation().

This method is called two times in succession, inside each loop iteration, 1st with a IS_COP flag, by initializing the standard COPROC, then calculating and collecting its result times outside the function (in a nested loop), 2nd by a regular call of the function, separately from the COPROC and collecting the times, calculated from inside the method.

The results for the different filtering approaches are finally being summed up in its corresponding associative array cells, which should hold the added-up elapsed times from the method's parallel invocations, both in the main process and COPROC:

testMethods[PARAM_SUBST_ARRAY]
testMethods[AWK_REGEX_FROM_ARRAY]

Here the example test code:

#! /bin/bash

shopt -s extglob 

# Define amount of test array value sets to generate and test iteration loops to run
declare -i i sets=1 loops=2
declare -a tmpArr elapsedTestTimes 

# Associative array for collecting measured test approach run time sums
declare -A testMethods
testMethods[PARAM_SUBST_ARRAY]=
testMethods[AWK_REGEX_FROM_ARRAY]=

# Initializing test array
while [[ $((++i)) -le $sets ]]; do 
    tmpArr+=("?? exec2.bin[$i]"\
    "A  file00 0.bin[$i]"\
    " A file11*1.bin[$i]"\
    "MR file22\03457zwei.bin[$i]"\
    " C file33\t3.bin[$i]"\
    "T  file44\$4.bin[$i]"\
    'D  file55"$(echo EXE)"5.bin['$i']'\
    " R101 renamedW1[$i]"\
    "R102  renamedI2[$i]"\
    "R104R104 myproject/src/test/util/MyUtil2Test.java[$i]")
done

# Measures the elapsed stop/start times for different filtering approaches on the test array.
# When called without arg1 flag, it calculates the delta, summing it up in the corresponding 
# 'testMethods' assoc array cells, for each filter measured. 
# Without flag, only the plain '<TEST_METHODS_KEY>=$stopTime-$startTime' value string is 
# returned to the caller, which has to calculate and add the measured times to the corresponding 
# 'testMethods' assoc array cells of the coproc async invocation
loopedElapsedTimeCalculation() {

    #[[ $1 ]] && echo -e "$1\n" >/dev/tty
    
    local stopTime1A stopTime1B startTime1A startTime1B IFS _IFS
    
    # COPROC invocation (external calc, sum and set of test run times)
    if [[ $1 ]]; then
        ### Perf-test filtering ARRAY via Pattern Matching in Parameter Substitution 
        printf 'COP - PARAM_SUBST_ARRAY__OLD_VAL: %f\n' "${testMethods[PARAM_SUBST_ARRAY]}" >/dev/tty
        startTime1A="$EPOCHREALTIME"
        printf '%s\n' "${tmpArr[@]/#[? ]*}" >/dev/null
        stopTime1A="$EPOCHREALTIME"
        printf '%s\n' "PARAM_SUBST_ARRAY=$stopTime1A-$startTime1A" 
        printf 'COP - PARAM_SUBST_ARRAY__NEW_TMP: %f\n' "$(awk 'BEGIN { printf '"$stopTime1A"' - '"$startTime1A"'; }')" >/dev/tty
        
        ### Perf-test filtering ARRAY via AWK extended RegEx 
        printf 'COP - AWK_REGEX_FROM_ARRAY__OLD_VAL: %f\n' "${testMethods[AWK_REGEX_FROM_ARRAY]}" >/dev/tty
        startTime2A="$EPOCHREALTIME"
        _IFS="$IFS"; IFS=$'\n'
        awk '/^[^ ?].*/ { print }' <<<"${tmpArr[*]}" >/dev/null 
        IFS="$_IFS"
        stopTime2A="$EPOCHREALTIME"
        printf '%s\n' "AWK_REGEX_FROM_ARRAY=$stopTime2A-$startTime2A" 
        printf 'COP - AWK_REGEX_FROM_ARRAY__NEW_TMP: %f\n' "$(awk 'BEGIN { printf '"$stopTime2A"' - '"$startTime2A"'; }')" >/dev/tty
    # Regular invocation (implicit calc, sum and set of test run times)
    else
        ### Perf-test filtering ARRAY via Pattern Matching in Parameter Substitution 
        printf 'NO_COP - PARAM_SUBST_ARRAY__OLD_VAL: %f\n' "${testMethods[PARAM_SUBST_ARRAY]}" >/dev/tty
        startTime1B="$EPOCHREALTIME"
        printf '%s\n' "${tmpArr[@]/#[? ]*}" >/dev/null
        stopTime1B="$EPOCHREALTIME"
        read -r PARAM_SUBST_ARRAY < <(
            awk 'BEGIN { printf "%f", '"${testMethods[PARAM_SUBST_ARRAY]}"' + '"$stopTime1B"' - '"$startTime1B"'; }'
        )
        testMethods[PARAM_SUBST_ARRAY]="$PARAM_SUBST_ARRAY"
        printf 'NO_COP - PARAM_SUBST_ARRAY__NEW_TMP: %f\n' "$(awk 'BEGIN { printf '"$stopTime1B"' - '"$startTime1B"'; }')" >/dev/tty
        printf 'NO_COP - PARAM_SUBST_ARRAY__NEW_VAL: %f\n' "${testMethods[PARAM_SUBST_ARRAY]}" >/dev/tty
        
        ### Perf-test filtering ARRAY via AWK extended RegEx 
        printf 'NO_COP - AWK_REGEX_FROM_ARRAY__OLD_VAL: %f\n' "${testMethods[AWK_REGEX_FROM_ARRAY]}" >/dev/tty
        startTime2B="$EPOCHREALTIME"
        _IFS="$IFS"; IFS=$'\n'
        awk '/^[^ ?].*/ { print }' <<<"${tmpArr[*]}" >/dev/null 
        IFS="$_IFS"
        stopTime2B="$EPOCHREALTIME"
        read -r AWK_REGEX_FROM_ARRAY < <(
            awk 'BEGIN { printf "%f", '"${testMethods[AWK_REGEX_FROM_ARRAY]}"' + '"$stopTime2B"' - '"$startTime2B"'; }'
        )
        testMethods[AWK_REGEX_FROM_ARRAY]="$AWK_REGEX_FROM_ARRAY"
        printf 'NO_COP - AWK_REGEX_FROM_ARRAY__NEW_TMP: %f\n' "$(awk 'BEGIN { printf '"$stopTime2B"' - '"$startTime2B"'; }')" >/dev/tty
        printf 'NO_COP - AWK_REGEX_FROM_ARRAY__NEW_VAL: %f\n' "${testMethods[AWK_REGEX_FROM_ARRAY]}" >/dev/tty
    fi
}

i=0
# Should repeat test runs $loops times, with 2 independently running async invocations 
# of loopedElapsedTimeCalculation() each time, to reduce large-test run times
while [[ $((i++)) -lt $loops ]]; do
    
    # Initialize COPROC to run test method
    coproc { loopedElapsedTimeCalculation 'IS_COP'; }

    # Retrieve and process KEY/TIME value results while COPROC returns them
    # --> should run asynchronously, but doesn't !
    while read; do 
        for m in "${!testMethods[@]}"; do
            if [[ "${REPLY%=*}" == "$m" ]]; then
                printf 'M: %s\n' "$m" >/dev/tty;
                sleep 1
                read -d $'\n' -r tmpTime < <(
                    awk 'BEGIN { printf "%f", '"${testMethods[$m]}"' + '"${REPLY#*=}"'; }';
                )
                testMethods["$m"]="$tmpTime"
            fi
        done
        printf 'COP - %s__NEW_VAL: %f\n' "$m" "${testMethods[$m]}" >/dev/tty;
    done <&"$COPROC"

    # Regular test method run
    loopedElapsedTimeCalculation 
    
    # Wait for COPROC to finish, before new async test iteration can start
    wait $COPROC_PID
done 

echo -e >/dev/tty

for m in "${!testMethods[@]}"; do
    printf '%s SUM: %f\n' "$m" "${testMethods[$m]}" >/dev/tty
done 

I've been trying to adapt a modified example with continuous retrieval of COPROC results, like here:

while read output <&"${COPROC[0]}"; do echo $output; done

As using this inside the loop/with looped awk call would result in the error

"$COPROC": Bad file descriptor

I've changed it to the non-failing variant

while read; do ... done <&"$COPROC"

So far, this seems to work, but the asynchronous parallelization will not:

COP - PARAM_SUBST_ARRAY__OLD_VAL: 0.000000
M: PARAM_SUBST_ARRAY
COP - PARAM_SUBST_ARRAY__NEW_TMP: 0.000105
COP - AWK_REGEX_FROM_ARRAY__OLD_VAL: 0.000000
COP - AWK_REGEX_FROM_ARRAY__NEW_TMP: 0.043876
COP - PARAM_SUBST_ARRAY__NEW_VAL: 0.000105
M: AWK_REGEX_FROM_ARRAY
COP - PARAM_SUBST_ARRAY__NEW_VAL: 0.000105
NO_COP - PARAM_SUBST_ARRAY__OLD_VAL: 0.000105
NO_COP - PARAM_SUBST_ARRAY__NEW_TMP: 0.000100
NO_COP - PARAM_SUBST_ARRAY__NEW_VAL: 0.000205
NO_COP - AWK_REGEX_FROM_ARRAY__OLD_VAL: 0.043876
NO_COP - AWK_REGEX_FROM_ARRAY__NEW_TMP: 0.043439
NO_COP - AWK_REGEX_FROM_ARRAY__NEW_VAL: 0.087315
COP - PARAM_SUBST_ARRAY__OLD_VAL: 0.000205
M: PARAM_SUBST_ARRAY
COP - PARAM_SUBST_ARRAY__NEW_TMP: 0.000102
COP - AWK_REGEX_FROM_ARRAY__OLD_VAL: 0.087315
COP - AWK_REGEX_FROM_ARRAY__NEW_TMP: 0.044305
COP - PARAM_SUBST_ARRAY__NEW_VAL: 0.000307
M: AWK_REGEX_FROM_ARRAY
COP - PARAM_SUBST_ARRAY__NEW_VAL: 0.000307
NO_COP - PARAM_SUBST_ARRAY__OLD_VAL: 0.000307
NO_COP - PARAM_SUBST_ARRAY__NEW_TMP: 0.000090
NO_COP - PARAM_SUBST_ARRAY__NEW_VAL: 0.000397
NO_COP - AWK_REGEX_FROM_ARRAY__OLD_VAL: 0.131620
NO_COP - AWK_REGEX_FROM_ARRAY__NEW_TMP: 0.041941
NO_COP - AWK_REGEX_FROM_ARRAY__NEW_VAL: 0.173561

The inserted debug output and sleep 1 calls inside the nested result retrieval loop for COPROC show, that main and COPROC do not run asynchronously, instead main is only invoked each time, after the COPROC has finished - so this runs synchronized.

When putting the tegular test method call to loopedElapsedTimeCalculation before the nested while loop (if extracted into a separate method, too), the result seems to show the intended behavior of asynchronous execution:

NO_COP - PARAM_SUBST_ARRAY__OLD_VAL: 0.000000
COP - PARAM_SUBST_ARRAY__OLD_VAL: 0.000000
COP - PARAM_SUBST_ARRAY__NEW_TMP: 0.000127
COP - AWK_REGEX_FROM_ARRAY__OLD_VAL: 0.000000
NO_COP - PARAM_SUBST_ARRAY__NEW_TMP: 0.000088
NO_COP - PARAM_SUBST_ARRAY__NEW_VAL: 0.000088
NO_COP - AWK_REGEX_FROM_ARRAY__OLD_VAL: 0.000000
COP - AWK_REGEX_FROM_ARRAY__NEW_TMP: 0.055613
NO_COP - AWK_REGEX_FROM_ARRAY__NEW_TMP: 0.045002
NO_COP - AWK_REGEX_FROM_ARRAY__NEW_VAL: 0.045002
M: PARAM_SUBST_ARRAY
COP - PARAM_SUBST_ARRAY__NEW_VAL: 0.000215
M: AWK_REGEX_FROM_ARRAY
COP - PARAM_SUBST_ARRAY__NEW_VAL: 0.000215
NO_COP - PARAM_SUBST_ARRAY__OLD_VAL: 0.000215
COP - PARAM_SUBST_ARRAY__OLD_VAL: 0.000215
COP - PARAM_SUBST_ARRAY__NEW_TMP: 0.000077
COP - AWK_REGEX_FROM_ARRAY__OLD_VAL: 0.100615
NO_COP - PARAM_SUBST_ARRAY__NEW_TMP: 0.000074
NO_COP - PARAM_SUBST_ARRAY__NEW_VAL: 0.000289
NO_COP - AWK_REGEX_FROM_ARRAY__OLD_VAL: 0.100615
COP - AWK_REGEX_FROM_ARRAY__NEW_TMP: 0.106964
NO_COP - AWK_REGEX_FROM_ARRAY__NEW_TMP: 0.042208
NO_COP - AWK_REGEX_FROM_ARRAY__NEW_VAL: 0.142823
M: PARAM_SUBST_ARRAY
COP - PARAM_SUBST_ARRAY__NEW_VAL: 0.000366
M: AWK_REGEX_FROM_ARRAY
COP - PARAM_SUBST_ARRAY__NEW_VAL: 0.000366

AWK_REGEX_FROM_ARRAY SUM: 0.249787
PARAM_SUBST_ARRAY SUM: 0.000366

But now, intermittently a race condition (how exactly?) occurs, sometimes for one, or for both of COPROC's access:

"$COPROC": Bad file descriptor

Why is that and how to change the code, so the asynchronous test times calculation works as intended?

Of course I can move the nested while loop inside the COPROC, but then the changes to the testmethods array aren't propagated to the main process, which was the very intention of using a co-process, instead of just detaching it, via & and using an explicit global or export var in main won't work with sub-processes, either.

2
  • 2
    Check stackoverflow.com/help/minimal-reproducible-example Commented Feb 25 at 22:47
  • I've already considerably simplified from the code actually in use, providing a working example for which I hoped to have provided sufficiently explaining comments, so others can use it. If I find the time, I'll try to reduce it further - but most of the (printf) code which may be distractive is just for test data setup and debugging output anyway. Commented Feb 26 at 21:46

2 Answers 2

0

The question is describing several different problems. I'll try to help with one of them. If I understand it correctly, code like this:

coproc { loopedElapsedTimeCalculation 'IS_COP'; }

loopedElapsedTimeCalculation 

while read; do 
    ...
done <&"$COPROC"

is producing the error "$COPROC": Bad file descriptor. That occurs because Bash closes the file descriptors for a coprocess as soon as it detects that it has exited. In this case, the coprocess must be exiting while loopedElapsedTimeCalculation is running.

One possible solution is to open the file descriptor immediately after starting the coprocess and keep it open until all the data has been read from it. One way to do that is:

coproc { loopedElapsedTimeCalculation 'IS_COP'; }

{
    loopedElapsedTimeCalculation 

    while read; do 
        ...
    done
} <&"$COPROC"

That is still vulnerable to a race condition (if the coprocess runs to completion before the following block is executed), but it is much less likely to happen.

A better solution is to use process substitution instead of a coprocess:

{
    loopedElapsedTimeCalculation 

    while read; do 
        ...
    done
} < <(loopedElapsedTimeCalculation 'IS_COP')

There's no need for a coprocess because the program doesn't need to write data to the subprocess. The coprocess mechanism in Bash has many limitations and is generally best avoided anyway.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for replying pjh. Your 1st suggestion would invoke loopedElapsedTimeCalculation though each time, it's already returning result lines from its invocations inside COPROC, if I understand correctly. It also shows in the execution and summed-up times. Your 2nd proposal of a Process Substitution, has the same problem, and it eliminates the initial purpose of asynchronous, parallel execution, the very reason I've chosen COPROC in the first place. As (hopefully) clarified above, the goal was to run 2 parallel processes for each external loop iteration, to speed things up.
0

The easiest way to fix this is to not use the COPROC[0] and COPROC[1[ file descriptor. redirect the coprocs stdin / stdout to anonymous pipes.

{ coproc {  loopedElapsedTimeCalculation 'IS_COP'; } <&$fd0 >&$fd1 2>&$fd2; } {fd0}<><(:) {fd1}<><(:) {fd2}>&2

while read -r -u $fd1; do
...
done

You can then send the coproc stuff by writing to &$fd0 (if needed) and read the output by reading from &$fd1

To make parallel execution really easy and efficient, id suggest checking out my forkrun utility. It uses persistent bash coprocs under the hood. You can even set it up to be a persistent async process that you send commands to and it will run them on demand and then quietly wait for more. e.g., something like

{ coproc fr {
    forkrun -N <&$fd0 >&$fd1 2>&$fd2;
  }
} {fd0}<><(:) {fd1}<><(:) {fd2}>&2

echo "loopedElapsedTimeCalculation 'IS_COP'" >&$fd0
# do stuff while that runs
read -r -u $fd1 output

4 Comments

Thanks for your suggestion @jkool702 - could you please explain in detail, how the redirection part exactly works - it looks quite arcane to me - so I can understand and adapt it to my needs?
(1/2) sure. this involves a few parts: 1) in bash, doing { cmds; } {fd}>&__ will setup the {fd}>&__ redirect so it is available for stuff run inside the brace group. its sort of like doing exec {fd}>&__; { cmds; } 2) setting up a redirect {fd}<><(:) will open up an anonymous pipe at file descriptor $fd. you can read from and write to this pipe. note: the pipe has a finite-size pipe buffer - if the pipe has unread data buffered in excess of the pipe buffer then it will block things trying to write to it.
(2/2) So, 1) the outer brace group ({ ... } {fd0}<><(:) {fd1}<><(:) {fd2}>&2) opens up anonymous pipes at $fd0 and $fd1, and redirects $fd2 to stderr (&2). $fd0 and $fd1 will replace ${COPROC[1]} and ${COPROC[0]} 2) the inner brace group + coproc argument (coproc { ... } <&$fd0 >&$fd1 2>&$fd2;) redirects {fd0}, {fd1} and {fd2} to the stdin / stdout / stderr of the coproc.
Thanks for your input, jkool702, I'll give it another try with your suggestions, when I have time.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.