2

I seem to be having a difficult time understanding pythons asyncio. I have not written any code, as all the examples I see are for one-off runs. Create a few coroutine's, add them to an event loop, then run the loop, they run the tasks switching between them, done. Which does not seem all that helpful for me.

I want to use asyncio to not interrupt the operation in my application (using pyqt5). I want to create some functions that when called run in the asyncio event loop, then when they are done they do a callback.

What I imagine is. Create a separate thread for asyncio, create the loop and run it forever. Create some functions getFile(url, fp), get(url), readFile(file), etc. Then in the UI, I have a text box with a submit button, user enters url, clicks submit, it downloads the file.

But, every example I see, I cannot see how to add a coroutine to a running loop. And I do not see how I could do what I want without adding to a running loop.

#!/bin/python3
import asyncio
import aiohttp
import threading

loop = asyncio.get_event_loop()

def async_in_thread(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def _get(url, callback):
    print("get: " + url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            result = await response.text()
            callback(result)
            return

def get(url, callback):
    asyncio.ensure_future(_get(url, callback))

thread = threading.Thread(target=async_in_thread, args=(loop, ))

thread.start()

def stop():
    loop.close()

def callme(data):
    print(data)
    stop()

get("http://google.com", callme)

thread.join()

This is what I imagine, but it does not work.

3
  • asyncio.ensure_future() will do that. Commented Jun 16, 2018 at 19:13
  • @MartijnPieters The OP wants to add coroutines from a different thread, which ensure_future is not designed to do. Commented Jun 16, 2018 at 21:16
  • @user4815162342: fair enough. I was focusing on the 'how do I give the asyncio event loop a new task' part. Commented Jun 16, 2018 at 22:20

2 Answers 2

8

To add a coroutine to a loop running in a different thread, use asyncio.run_coroutine_threadsafe:

def get(url, callback):
    asyncio.run_coroutine_threadsafe(_get(url, callback))

In general, when you are interacting with the event loop from outside the thread that runs it, you must run everything through either run_coroutine_threadsafe (for coroutines) or loop.call_soon_threadsafe (for functions). For example, to stop the loop, use loop.call_soon_threadsafe(loop.stop). Also note that loop.close() must not be invoked inside a loop callback, so you should place that call in async_in_thread, right after the call to run_forever(), at which point the loop has definitely stopped running.

Another thing with asyncio is that passing explicit when_done callbacks isn't idiomatic because asyncio exposes the concept of futures (akin to JavaScript promises), which allow attaching callbacks to a not-yet-available result. For example, one could write _get like this:

async def _get(url):
    print("get: " + url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

It doesn't need a callback argument because any interested party can convert it to a task using loop.create_task and use add_done_callback to be notified when the task is complete. For example:

def _get_with_callback(url, callback):
    loop = asyncio.get_event_loop()
    task = loop.create_task(_get(url))
    task.add_done_callback(lambda _fut: callback(task.result()))

In your case you're not dealing with the task directly because your code aims to communicate with the event loop from another thread. However, run_coroutine_threadsafe returns a very useful value - a full-fledged concurrent.futures.Future which you can use to register done callbacks. Instead of accepting a callback argument, you can expose the future object to the caller:

def get(url):
    return asyncio.run_coroutine_threadsafe(_get(url), loop)

Now the caller can choose a callback-based approach:

future = get(url)
# call me when done
future.add_done_callback(some_callback)
# ... proceed with other work ...

or, when appropriate, they can even wait for the result:

# give me the response, I'll wait for it
result = get(url).result()

The latter is by definition blocking, but since the event loop is safely running in a different thread, it is not affected by the blocking call.

Sign up to request clarification or add additional context in comments.

4 Comments

This is exactly what I was trying to do. I just have a followup, how do I properly close this now? I got rid of callbacks, I am using the Future object, get returns the run_coroutine_threadsafe now, data = get(...) then I do data.result() (all this works perfectly). But, when I do loop.call_soon_threadsafe(loop.close) just below data.result() I get an error "Cannot close a running event loop". I tried wrapping a loop.stop() coroutine like get above, but that did not work either.
@Drew Since loop.stop() is not a coroutine, you should use loop.call_soon_threadsafe(loop.stop). As for loop.close(), best call it from async_in_thread, right after the call to loop.run_forever(). (run_forever() having completed is proof that loop.stop() has been processed and that the loop is no longer running.)
Ok, that makes sense since run_forever() is blocking, adding close() does nothing, then in my exit call before shutting down application I run loop.call_soon_threadsafe(loop.stop) then it shuts down nicely, that worked perfectly. Thank you.
@Drew In case of a GUI program, an equally valid option is to start the event loop thread at the beginning (with daemon set to true), and never bothering with stop/close. Closing the event loop is important for temporary event loops created by a library (or a test runner, and so on), where it serves to release the resources held by the loop itself. In a more typical asyncio scenario, there is only one event loop and it runs for as long as the program, and whether close() is called when the program is exiting anyway is of little importance.
1

Install QualMash to smooth integration between Qt and asyncio.

Example from the project's README gives an inspiration for how it looks like:

import sys
import asyncio
import time

from PyQt5.QtWidgets import QApplication, QProgressBar
from quamash import QEventLoop, QThreadExecutor

app = QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop)  # NEW must set the event loop

progress = QProgressBar()
progress.setRange(0, 99)
progress.show()

async def master():
    await first_50()
    with QThreadExecutor(1) as exec:
        await loop.run_in_executor(exec, last_50)

async def first_50():
    for i in range(50):
        progress.setValue(i)
        await asyncio.sleep(.1)

def last_50():
    for i in range(50,100):
        loop.call_soon_threadsafe(progress.setValue, i)
        time.sleep(.1)

with loop: ## context manager calls .close() when loop completes, and releases all resources
    loop.run_until_complete(master())

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.