My question is very similar to Combining asyncio with a multi-worker ProcessPoolExecutor - however a slight change (I believe it's the async for) makes the excellent answers there unusuable for me.
I am trying the following MWE:
import concurrent.futures
import asyncio
import time
async def mygen(u: int = 2):
i = 0
while i < u:
yield i
i += 1
def blocking(delay):
time.sleep(delay+1)
return('EXECUTOR: Completed blocking task number ' + str(delay+1))
async def non_blocking(loop):
with concurrent.futures.ProcessPoolExecutor() as executor:
async for i in mygen():
print('MASTER: Sending to executor blocking task number ' + str(i+1))
result = await loop.run_in_executor(executor, blocking, i)
print(result)
print('MASTER: Well done executor - you seem to have completed blocking task number ' + str(i+1))
loop = asyncio.get_event_loop()
loop.run_until_complete(non_blocking(loop))
The output from this, as expected, is not asynchronous:
MASTER: Sending to executor blocking task number 1
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
MASTER: Sending to executor blocking task number 2
EXECUTOR: Completed blocking task number 2
MASTER: Well done executor - you seem to have completed blocking task number 2
I would like to adjust the code so that the tasks are running in two concurrent processes and printing the output as it becomes available. Desired output is:
MASTER: Sending to executor blocking task number 1
MASTER: Sending to executor blocking task number 2
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
EXECUTOR: Completed blocking task number 2
MASTER: Well done executor - you seem to have completed blocking task number 2
I understand from Combining asyncio with a multi-worker ProcessPoolExecutor that, as things stand, my syntax of await loop.run_in_executor() is blocking. I don't know how to replace it in a way that allows the async for to move to the next generated value while waiting for the executor to finish their work. Note I am not using asyncio.gather as in their example.