9

My question is very similar to Combining asyncio with a multi-worker ProcessPoolExecutor - however a slight change (I believe it's the async for) makes the excellent answers there unusuable for me.

I am trying the following MWE:

import concurrent.futures
import asyncio
import time

async def mygen(u: int = 2):
    i = 0
    while i < u:
        yield i
        i += 1

def blocking(delay):
    time.sleep(delay+1)
    return('EXECUTOR: Completed blocking task number ' + str(delay+1))

async def non_blocking(loop):
    with concurrent.futures.ProcessPoolExecutor() as executor:
        async for i in mygen():
            print('MASTER: Sending to executor blocking task number ' + str(i+1))
            result = await loop.run_in_executor(executor, blocking, i)
            print(result)
            print('MASTER: Well done executor - you seem to have completed blocking task number ' + str(i+1))

loop = asyncio.get_event_loop()
loop.run_until_complete(non_blocking(loop))

The output from this, as expected, is not asynchronous:

MASTER: Sending to executor blocking task number 1
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
MASTER: Sending to executor blocking task number 2 
EXECUTOR: Completed blocking task number 2 
MASTER: Well done executor - you seem to have completed blocking task number 2

I would like to adjust the code so that the tasks are running in two concurrent processes and printing the output as it becomes available. Desired output is:

MASTER: Sending to executor blocking task number 1
MASTER: Sending to executor blocking task number 2
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
EXECUTOR: Completed blocking task number 2
MASTER: Well done executor - you seem to have completed blocking task number 2

I understand from Combining asyncio with a multi-worker ProcessPoolExecutor that, as things stand, my syntax of await loop.run_in_executor() is blocking. I don't know how to replace it in a way that allows the async for to move to the next generated value while waiting for the executor to finish their work. Note I am not using asyncio.gather as in their example.

0

1 Answer 1

4

If you want to have a maximum of two processes running your tasks, the simplest way to achieve that is to create the executor with max_workers=2. Then you can submit tasks as fast as possible, i.e. proceed with the next iteration of async for without waiting for the previous task to finish. You can gather the results of all tasks at the end, to ensure the exceptions don't go unnoticed (and possibly to get the actual results).

The following code produces the expected output:

from concurrent.futures import ProcessPoolExecutor
import asyncio
import time

async def mygen(u: int = 2):
    i = 0
    while i < u:
        yield i
        i += 1

def blocking(delay):
    time.sleep(delay+1)
    return('EXECUTOR: Completed blocking task number ' + str(delay+1))

async def run_blocking(executor, task_no, delay):
    print('MASTER: Sending to executor blocking task number '
          + str(task_no))
    result = await loop.run_in_executor(executor, blocking, delay)
    print(result)
    print('MASTER: Well done executor - you seem to have completed '
          'blocking task number ' + str(task_no))

async def non_blocking(loop):
    tasks = []
    with ProcessPoolExecutor(max_workers=2) as executor:
        async for i in mygen():
            # spawn the task and let it run in the background
            tasks.append(asyncio.create_task(
                run_blocking(executor, i + 1, i)))
        # if there was an exception, retrieve it now
        await asyncio.gather(*tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(non_blocking(loop))
Sign up to request clarification or add additional context in comments.

4 Comments

How would you restructure this when mygen is yielding from a queue that never stops yielding? (producer/consumer pattern). I can't think of a nice way to let the process pool executor take what it needs from the queue, throttled by max_workers
@John You could create a Semaphore, pass it torun_blocking(), and acquire it around the await.
Ah I see, so effectively throttling the calls to the ProcessPoolExecutor in asyncio rather than replying on the pool to do the work? - Thanks for your reply btw!
@John Come to think of it, this won't work either, if mygen() is generating items faster than the executor can execute them. I guess for that you need a queue leading to a pool of async workers.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.