2

I am using Python 3.11.5 with the below code:

import asyncio
from collections.abc import AsyncIterable


# Leave this iterable be, the question is about
# how to use many instances of this in parallel
async def iterable() -> AsyncIterable[int]:
    yield 1
    yield 2
    yield 3


# How can one get multiple async iterables to work with asyncio.gather?
# In other words, since asyncio.gather works with asyncio.Task,
# (1) How can one convert an async iterable to a Task?
# (2) How can one use asyncio.gather to run many of these tasks in parallel,
#     keeping the results 1-1 with the source iterables?
results_1, results_2, results_3 = asyncio.gather(iterable(), iterable(), iterable())

To restate the question, how can one get:

  • An AsyncIterable as an asyncio task, where the task iterates until exhaustion
  • Run multiple of these tasks in parallel, storing the results on a per-task basis

(e.g. for use with asyncio.gather)?

I am looking for a 1 - 3 line snippet showing how to connect these dots.

4
  • 1
    Could you explain more clearly what are you trying to achieve? I mean, there's no obvious meaning for awaiting or asyncio.gather'ing an async generator - what should this produce? You yield numbers, so it's not about async iterable producing new futures, right? Then what should happen - wait for first item? Consume this generator entirely as async for would do? Commented Dec 11, 2023 at 21:06
  • What does "running" a generator mean for you? It can be consumed using async for item in iterable() to read the values from it sequentially. You put this async-for loop into an async function and run it in any convenient method (entrypoint - asyncio.run). Commented Dec 12, 2023 at 0:27
  • Okay, I have expanded the question a bit to make it more clear @STerliakov. asyncio.run may be part of the answer. Thanks for asking for clarification! Commented Dec 12, 2023 at 5:26
  • 2
    I think you are looking for such a manner to have concurrency via asyncio for your methods or generators. I suggest check this post out. Commented Dec 12, 2023 at 7:10

1 Answer 1

3
+300

According your code snippet, you're trying to pass an async generator function (iterable) directly to asyncio.gather, however, it expects awaitables (coroutines, Tasks, or Futures). So, to fix the issue, one possible solution is, to create a new coroutine that consumes the async iterable()s and collects their items in the lists.

The code will be like this:

async def collect(async_iterable):
    return [item async for item in async_iterable]

results = asyncio.run(asyncio.gather(
    asyncio.create_task(collect(iterable())),
    asyncio.create_task(collect(iterable())),
    asyncio.create_task(collect(iterable()))
))

In this way, you would have three [1,2,3] lists, but still, there is a problem — you haven't gone through a concurrency manner among the iterable()s. I mean by adding a print within the collect() you see they are running sequentially. Therefore, to have an asynchronous behaviour between your iterables you need at least an await statement within the generator to make it as an awaitable coroutine used for switching. Here we can use await asyncio.sleep(0) as a trick!

Here's the whole code:

import asyncio
from collections.abc import AsyncIterable

async def iterable() -> AsyncIterable[int]:
    yield 1
    await asyncio.sleep(0)
    yield 2
    await asyncio.sleep(0)
    yield 3

async def collect(async_iterable: AsyncIterable[int]) -> list:
    result = []
    async for item in async_iterable:
        print(item)
        result.append(item)
    return result

async def main():
    tasks = [
        asyncio.create_task(collect(iterable())),
        asyncio.create_task(collect(iterable())),
        asyncio.create_task(collect(iterable()))
    ]
    results = await asyncio.gather(*tasks)
    return results

results_1, results_2, results_3 = asyncio.run(main())
print(results_1, results_2, results_3)

Out:

1
1
1
2
2
2
3
3
3
[1, 2, 3] [1, 2, 3] [1, 2, 3]
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.