1

I have a few coroutines running on one process (A) and one heavier unbounded job running on a separate process(B). I would like that heavier job to dispatch its results into a queue which is consumed by the original process (A).

Similar to this:

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor


def process__heavy(pipe):
    print("[B] starting...")
    while True:
        print(f"[B] Pipe queue: {pipe.qsize()}")
        pipe.put_nowait(str(time.time()))
        time.sleep(0.5)

async def coroutine__stats(pipe):
    print("[A] starting...")
    while True:
        print(f"[A] Pipe queue: {pipe.qsize()}")
        await asyncio.sleep(1)

  
async def main():
    pipe = asyncio.Queue()
    executor = ProcessPoolExecutor()

    jobs = await asyncio.gather(
        asyncio.get_running_loop().run_in_executor(executor, process__heavy, pipe),
        coroutine__stats(pipe)
    )

    print(f"Finished with result: {jobs.result()}")


if __name__ == '__main__':
    asyncio.run(main())
    print("Bye.")

Outut

[A] starting...
[A] Pipe queue: 0
[B] starting...
[B] Pipe queue: 0
[B] Pipe queue: 1
[A] Pipe queue: 0 <--- why zero?
[B] Pipe queue: 2
[B] Pipe queue: 3
[A] Pipe queue: 0 <---
[B] Pipe queue: 4
[B] Pipe queue: 5
[A] Pipe queue: 0 <---
[B] Pipe queue: 6
[B] Pipe queue: 7
[A] Pipe queue: 0
[B] Pipe queue: 8

The original process (A) does not see any data put into the shared queue. I do not remember if in python you can do object sharing across processes or if is it all pickled and the only result you can get is when the process exits and returns?

What am I doing wrong and what would be the best way to create a data pipe between those 2 processes?

3
  • I am afraid the asyncio.Queue is not optimized for multiprocessing. You can use queue.Queue instead. Commented Oct 4, 2022 at 6:36
  • @ArtyomVancyan No, queue.Queue doesn't handle multiprocessing, but multiprocessing.Manager().Queue() does. Regular multiprocessing.Queue won't pickle due to the way it is passed. Commented Oct 4, 2022 at 6:43
  • @ArtyomVancyan on a different note - nice name! Similar to mine, haha Commented Oct 4, 2022 at 7:02

1 Answer 1

1

Use a multiprocessing.Manager() to create the Queue instead of asyncio.Queue:

import multiprocessing as mp
# ...
pipe = mp.Manager().Queue()

With that change to the OP code:

[A] starting...
[A] Pipe queue: 0
[B] starting...
[B] Pipe queue: 0
[B] Pipe queue: 1
[A] Pipe queue: 2
[B] Pipe queue: 2
[B] Pipe queue: 3
[A] Pipe queue: 4
[B] Pipe queue: 4
[B] Pipe queue: 5
[A] Pipe queue: 6
[B] Pipe queue: 6
[B] Pipe queue: 7
[A] Pipe queue: 8
[B] Pipe queue: 8
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks, this works! Where did you get docs on the fact that this specific Queue implementation must be used?
Docs for asyncio.Queue says "This class is not thread safe." so logically I changed it to multiprocessing.Queue. That didn't work, but from experience I knew that multiprocessing.Manager().Queue() is more flexible.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.