2

Is there an equivalent to itertools.chain(*iterables) that works with async iterators? One key requirement would be to get available data from an async iterator as soon as it's available (i.e. no naive chaining).

Update: Note that one key difference to the duplicate question, is that the answer below lets you identify the triggering async generator.

0

1 Answer 1

3

The following piece of code solves this problem:

import asyncio

class InternalStopAsyncIteration(Exception):
    """A special stop exception that also returns the finished generator's key."""
    def __init__(self, key):
        self.key = key

async def anext(key, gen):
    try:
        return key, await gen.__anext__()
    except StopAsyncIteration:
        raise InternalStopAsyncIteration(key)

async def combine_async_generators(**gens):
    pending = {anext(key, gen) for key, gen in gens.items()}
    while pending:
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
        for i in done:
            if isinstance(i.exception(), InternalStopAsyncIteration):
                gens.pop(i.exception().key)
            else:
                key, val = i.result()
                pending.add(anext(key, gens[key]))
                yield key, val

# The following will print:
# a 0.5
# b 1
# a 0.5
# a 0.5
# b 1
# b 1
async def gen(x):
    """An async generator that sleeps a bit, then yields the given value."""
    for i in range(3):
        await asyncio.sleep(x)
        yield x

async def run():
    async for k, v in combine_async_generators(a=gen(0.5), b=gen(1)):
        print(k, v)

asyncio.get_event_loop().run_until_complete(run())
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.