4

I have a device that needs multiprocessing to handle the CPU bound deserialization & decoding of the incoming data; but the rest of the application is slower IO-limited code, which is excellent for asyncio. However, it seems like there is no good way to combine multiprocessing and asyncio together.

I have tried https://github.com/dano/aioprocessing, which uses threaded executors for multiprocessing operations. However, this library does not natively support common asyncio operations; for example, canceling a co-routine waiting on a queue.get with this library will lead to deadlock.

I have also tried to use a ProcessPoolExecutor, but passing multiprocessing objects to this executor does not work since the queue objects are not passed at the creation of the process.

import multiprocessing
import asyncio
import atexit
from concurrent.futures import ProcessPoolExecutor


@atexit.register
def kill_children():
    [p.kill() for p in multiprocessing.active_children()]


async def queue_get(queue: multiprocessing.Queue):
    executor = ProcessPoolExecutor(max_workers=1)
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(executor, queue.get)


async def main():
    queue = multiprocessing.Queue()
    get_task = asyncio.create_task(queue_get(queue))

    queue.put(None)

    print(await get_task)


if __name__ == "__main__":
    asyncio.run(main())

Running this code leads to this exception:

RuntimeError: Queue objects should only be shared between processes through inheritance

Is there any way to cleanly bridge the gap between multiprocessing and asyncio?

2
  • Do the answers here help? Might be a duplicate Commented Jul 9, 2019 at 1:53
  • Yes, that does, thank you @new-dev-123. I was missing a multiprocessing.Manager() to make the code work. Commented Jul 9, 2019 at 2:16

2 Answers 2

4

Per Can I somehow share an asynchronous queue with a subprocess?

The above code can be modified to work with a multiprocessing queue by creating the queue through a multiprocessing.Manager()

import multiprocessing
import asyncio
import atexit
from concurrent.futures import ProcessPoolExecutor


@atexit.register
def kill_children():
    [p.kill() for p in multiprocessing.active_children()]


async def queue_get(queue: multiprocessing.Queue):
    executor = ProcessPoolExecutor(max_workers=1)
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(executor, queue.get)


async def main():
    manager = multiprocessing.Manager()
    queue = manager.Queue()
    get_task = asyncio.create_task(queue_get(queue))

    queue.put(None)
    print(await get_task)


if __name__ == "__main__":
    asyncio.run(main())
Sign up to request clarification or add additional context in comments.

Comments

0

Here is an async function run_subprocess_with_callback which will take an async function and run it in a ProcessPoolExecutor. If the run_subprocess_with_callback is cancelled, then the ProcessPoolExecutor will be terminated as part of the async cancellation.

run_process_with_callback also takes a callback function for messaging back to the main process. Internally this callback function is implemented with a Manager Queue.

https://pyedifice.github.io/utilities.html#edifice.utilities.run_subprocess_with_callback

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.