The following piece of code solves this problem:
import asyncio
class InternalStopAsyncIteration(Exception):
"""A special stop exception that also returns the finished generator's key."""
def __init__(self, key):
self.key = key
async def anext(key, gen):
try:
return key, await gen.__anext__()
except StopAsyncIteration:
raise InternalStopAsyncIteration(key)
async def combine_async_generators(**gens):
pending = {anext(key, gen) for key, gen in gens.items()}
while pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for i in done:
if isinstance(i.exception(), InternalStopAsyncIteration):
gens.pop(i.exception().key)
else:
key, val = i.result()
pending.add(anext(key, gens[key]))
yield key, val
# The following will print:
# a 0.5
# b 1
# a 0.5
# a 0.5
# b 1
# b 1
async def gen(x):
"""An async generator that sleeps a bit, then yields the given value."""
for i in range(3):
await asyncio.sleep(x)
yield x
async def run():
async for k, v in combine_async_generators(a=gen(0.5), b=gen(1)):
print(k, v)
asyncio.get_event_loop().run_until_complete(run())