1

I'm trying to expose an event-based communication as a coroutine. Here is an example:

class Terminal:

    async def start(self):
        loop = asyncio.get_running_loop()
        future = loop.create_future()

        t = threading.Thread(target=self.run_cmd, args=future)
        t.start()

        return await future

    def run_cmd(self, future):
        time.sleep(3)  # imitating doing something
        future.set_result(1)

But when I run it like this:

async def main():
    t = Terminal()
    result = await t.start()
    print(result)

asyncio.run(main())

I get the following error: RuntimeError: await wasn't used with future

Is it possible to achieve the desired behavior?

1 Answer 1

3

There are two issues with your code. One is that the args argument to the Thread constructor requires a sequence or iterable, so you need to write wrap the argument in a container, e.g. args=(future,). Since future is iterable (for technical reasons unrelated to this use case), args=future is not immediately rejected, but leads to the misleading error later down the line.

The other issue is that asyncio objects aren't thread-safe, so you cannot just call future.set_result from another thread. This causes the test program to hang even after fixing the first issue. The correct way to resolve the future from another thread is through the call_soon_threadsafe method on the event loop:

class Terminal:
    async def start(self):
        loop = asyncio.get_running_loop()
        future = loop.create_future()
        t = threading.Thread(target=self.run_cmd, args=(loop, future,))
        t.start()
        return await future

    def run_cmd(self, loop, future):
        time.sleep(3)
        loop.call_soon_threadsafe(future.set_result, 1)

If your thread is really just calling a blocking function whose result you're interested in, consider using run_in_executor instead of manually spawning threads:

class Terminal:
    async def start(self):
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, self.run_cmd)

    # Executed in a different thread; `run_in_executor` submits the
    # callable to a thread pool, suspends the awaiting coroutine until
    # it's done, and transfers the result/exception back to asyncio.
    def run_cmd(self):
        time.sleep(3)
        return 1
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.