3

A 3rd-party API library provides an iterator for listing items and features built-in pagination. It's blocking and I would like to do multiple listing in parallel.

async def list_multiple(params_list):
    async_tasks = []
    for params in params_list:
        async_tasks.append(list_one(**params))
    await asyncio.gather(*async_tasks)


async def list_one(**kwargs):
    blocking_iterator = some_library.get_api_list_iterator(**kwargs)
    async for item in iterate_blocking(blocking_iterator):
        pass  # do things


async def iterate_blocking(iterator):
    loop = asyncio.get_running_loop()
    while True:
        try:
            yield await loop.run_in_executor(None, iterator.next)
        except StopIteration:
            break

But doing this raises

TypeError: StopIteration interacts badly with generators and cannot be raised into a Future

and blocks all threads. How do I iterate a blocking iterator correctly?

0

1 Answer 1

4

Note that the method used for iteration is called __next__ in Python 3, not next. next works probably because of some Python 2 compatibility code being set up by the library.

You can fix the issue by catching StopIteration while still in the auxilliary thread, and using a different exception, or another kind of signal, to indicate end of iteration. For example, this code uses a sentinel object:

async def iterate_blocking(iterator):
    loop = asyncio.get_running_loop()
    DONE = object()
    def get_next():
        try:
            return iterator.__next__()
        except StopIteration:
            return DONE

    while True:
        obj = await loop.run_in_executor(None, get_next)
        if obj is DONE:
            break
        yield obj

This can be further simplified using the two-argument form of the next built-in, which does essentially the same thing as get_next:

async def iterate_blocking(iterator):
    loop = asyncio.get_running_loop()
    DONE = object()
    while True:
        obj = await loop.run_in_executor(None, next, iterator, DONE)
        if obj is DONE:
            break
        yield obj

(Both above examples are untested, so typos are possible.)

Sign up to request clarification or add additional context in comments.

6 Comments

Ah, I was hoping there is a better way. Thanks anyway!
@Andrei Better in the sense of avoiding the use of a sentinel, or better in the sense of avoiding run_in_executor altogether? Also, are you worrtied about efficiency or about the number of lines? The second snippet is failrly minimalistic in terms of LOC.
Just expected there was a way without an extra function and an object instance, but oh well, gotta limit my appetite for minimalistic code.
@Andrei The second snippet avoids the extra function by making use of the functionality of next. Object instance can be made global if you don't care about reentrance, but I find it safer to just create it every time (and object() is as efficient as it gets).
@Andrei You can do as you wish, but that makes the code brittle, as you can't reuse it for other situations where returning the StopIteration object might be possible. The worst aspect of such design is that if the code ever does it, it would just silently fail without any indication of error.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.