8

Is there a good way, or a well-supported library, for merging async iterators in python3?

The desired behavior is basically the same as that of merging observables in reactivex.

That is, in the normal case, if I'm merging two async iterator, I want the resulting async iterator to yield results chronologically. An error in one of the iterators should derail the merged iterator.

Merging Observables

(Source: http://reactivex.io/documentation/operators/merge.html)

This is my best attempt, but it seems like something there might be a standard solution to:

async def drain(stream, q, sentinal=None):
    try:
        async for item in stream:
            await q.put(item)
        if sentinal:
            await q.put(sentinal)
    except BaseException as e:
        await q.put(e)


async def merge(*streams):

    q = asyncio.Queue()
    sentinal = namedtuple("QueueClosed", ["truthy"])(True)

    futures = {
        asyncio.ensure_future(drain(stream, q, sentinal)) for stream in streams
    }

    remaining = len(streams)
    while remaining > 0:
        result = await q.get()
        if result is sentinal:
            remaining -= 1
            continue
        if isinstance(result, BaseException):
            raise result
        yield result


if __name__ == "__main__":

    # Example: Should print:
    #   1
    #   2
    #   3
    #   4

    loop = asyncio.get_event_loop()

    async def gen():
        yield 1
        await asyncio.sleep(1.5)
        yield 3

    async def gen2():
        await asyncio.sleep(1)
        yield 2
        await asyncio.sleep(1)
        yield 4

    async def go():
        async for x in merge(gen(), gen2()):
            print(x)

    loop.run_until_complete(go())
1
  • how does the mergedelay resolve when two merge items arrive at the same time? does it randomly select one over the other Commented Jun 25, 2021 at 16:23

1 Answer 1

10

You can use aiostream.stream.merge:

from aiostream import stream

async def go():
    async for x in stream.merge(gen(), gen2()):
        print(x)

More examples in the documentation and this answer.

Sign up to request clarification or add additional context in comments.

2 Comments

Those links are returning 404.
@samfrances Oops, fixed!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.