0

I'd like to use asyncio to do a lot of simultaneous non-blocking IO in Python. However, I want that use of asyncio to be abstracted away from the user--under the hood there's a lot of asychronous calls going on simultaneously to speed things up, but for the user there's a single, synchronous call.

Basically something like this:

async def _slow_async_fn(address):
    data = await async_load_data(address)
    return data

def synchronous_blocking_io()
    addresses = ...
    tasks = []
    for address in addresses:
        tasks.append(_slow_async_fn(address))
    all_results = some_fn(asyncio.gather(*tasks))
    return all_results

The problem is, how can I achieve this in a way that's agnostic to the user's running environment? I use a pattern like asyncio.get_event_loop().run_until_complete(), I run into issues if the code is being called inside an environment like Jupyter where there's already an event loop running. Is there a way to robustly gather the results of a set of asynchronous tasks that doesn't require pushing async/await statements all the way up the program?

2
  • Running a blocking call inside an active event loop is going to conflict with it anyway. Why try and reuse something you cannot reuse? Start a new event loop and use that. Commented Jul 6, 2022 at 18:36
  • @MisterMiyagi Can you give an example? I believe that starting a new event loop also conflicts with the Jupyter environment Commented Jul 6, 2022 at 19:19

1 Answer 1

1

The restriction on running loops is per thread, so running a new event loop is possible, as long as it is in a new thread.

import asyncio
import concurrent.futures


async def gatherer_of(tasks):
    # It's necessary to wrap asyncio.gather() in a coroutine (reasons beyond scope)
    return await asyncio.gather(*tasks)


def synchronous_blocking_io():
    addresses = ...
    tasks = []
    for address in addresses:
        tasks.append(_slow_async_fn(address))

    loop = asyncio.new_event_loop()
    return loop.run_until_complete(gatherer_of(tasks))


def synchronous_blocking_io_wrapper():
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        fut = executor.submit(synchronous_blocking_io)
        return fut.result()


# Testing
async def async_runner():
    # Simulating execution from a running loop
    return synchronous_blocking_io_wrapper()

# Run from synchronous client
# print(synchronous_blocking_io_wrapper())

# Run from async client
# print(asyncio.run(async_runner()))

The same result can be achieved with the ProcessPoolExecutor, by manually running synchronous_blocking_io in a new thread and joining it, starting an entirely new process and so forth. As long as you are not in the same thread, you won't conflict with any running event loop.

Sign up to request clarification or add additional context in comments.

3 Comments

"As long as you are not in the same thread, you won't conflict with any running event loop." That's really not true for even practical cases. Waiting for the thread with the auxiliary thread to complete will completely block the original event loop. While this will not damage the event loop, it will interfere with any and all of its tasks.
Note that there is little point in using a thread/process pool to just run a single task and wait for its result. One can directly spawn a single thread/process without the entire pool machinery instead.
On the contrary, the statement is true within the scope of the OP's question. He asked for a solution that was agnostic to the running environment. That implies that, regardless of whether there is a running loop, it is his intention to run _slow_async_fn asynchronously. My proposed solution does just that. The fact that an original running loop would be blocked by joining a new thread from it, doesn't seem to be a factor, so much so that his function is called synchronous_blocking_io. I also added that one can use a Thread or a Process directly, see bottom of my answer.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.