0

I have two functions:

async def f(i):
    await asyncio.sleep(1)
    return f'f{i}'


async def g(i):
    await asyncio.sleep(2)
    return f'g{i}'

I want to write a loop that calls them repeatedly and prints the results as they come. Like this imaginary code:

for c in amerge(amap(f, itertools.count()), 
                amap(g, itertools.count())):
    x = await c
    print(x)

And the result should be approx f0, f1, g1, f2, f3, g2, f4, f5, g3, ...

My attempt was this:

async def run():
    """bad, always wait for f, then for g"""
    for i in itertools.count():
        for c in asyncio.as_completed([f(i), g(i)]):
            res = await c
            print(res)

asyncio.run(run())

But this is not correct, it prints f0, g0, f1, g1, ...

Any ideas?

2 Answers 2

1

The aiostream library provides a aiostream.stream.merge, which can be used to combine multiple async generators. If we rewrite your code like this:

import asyncio
import itertools

from aiostream import stream


async def f(i):
    await asyncio.sleep(1)
    return f"f{i}"


async def g(i):
    await asyncio.sleep(2)
    return f"g{i}"


async def amap(func, iterable):
    for i in iterable:
        res = await func(i)
        yield res


async def run():
    async with stream.merge(
        amap(f, itertools.count()), amap(g, itertools.count())
    ).stream() as streamer:
        async for x in streamer:
            print(x)


asyncio.run(run())

We get as output:

f0
f1
g0
f2
f3
g1
f4
f5
g2
f6
f7
g3
.
.
.

You'll note that the code here looks pretty much exactly like your pseudocode, except that amerge is provided by aiostream.stream.merge.

Sign up to request clarification or add additional context in comments.

Comments

1

One way to do this is to use asyncio.wait with parameter return_when=asyncio.FIRST_COMPLETED (but each iteration you must schedule next task list to run):

import asyncio
import itertools


async def f(i):
    await asyncio.sleep(1)
    return f"f{next(i)}", f(i)


async def g(i):
    await asyncio.sleep(2)
    return f"g{next(i)}", g(i)


async def run():
    tasks = [
        asyncio.create_task(f(itertools.count())),
        asyncio.create_task(g(itertools.count())),
    ]

    while True:
        # wait for first completed task:
        done, pending = await asyncio.wait(
            tasks, return_when=asyncio.FIRST_COMPLETED
        )

        # print results:
        next_tasks = []
        for finished_task in done:
            s, nxt = finished_task.result()
            next_tasks.append(asyncio.create_task(nxt))
            print(s)

        # create next task list to run:
        tasks = [*pending, *next_tasks]


asyncio.run(run())

Prints:

f0
g0
f1
f2
g1
f3
f4
g2
f5

...

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.