1

I am using FastApi and have one endpoint.

I have two long running functions which I want to run concurrently using asyncio

Therefore, I have created two functions:

async def get_data_one():
    return 'done_one'

async def get_data_two():
    return 'done_two'

These functions get data from external webservices.

I want to execute them concurrently so I have created a another function that does it:

async def get_data():
    loop = asyncio.get_event_loop()
    asyncio.set_event_loop(loop)
    task_1 = loop.create_task(get_data_one)
    task_2 = loop.create_task(get_data_two)
    tasks = (task_1, task_2)
    first, second = loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()

    # I will then perform cpu intensive computation on the results
    # for now - assume i am just concatenating the results
    return first + second

Finally, I have my endpoint:

@app.post("/result")
async def get_result_async():
    r = await get_data()

    return r

Even this simple example breaks and I get the following exception when I hit the endpoint:

RuntimeError: This event loop is already running ERROR: _GatheringFuture exception was never retrieved future: <_GatheringFuture finished exception=AttributeError("'function' object has no attribute 'send'",)> AttributeError: 'function' object has no attribute 'send'

This is a simplified code but I would really appreciate how to do it the right way.

2 Answers 2

7

When in FastAPI context, you never need to run an asyncio loop; it's always running for as long as your server process lives.

Therefore, all you need is

import asyncio


async def get_data_one():
    return "done_one"


async def get_data_two():
    return "done_two"


async def get_data():
    a, b = await asyncio.gather(get_data_one(), get_data_two())
    return a + b


#@route decorator here...
async def get_result_async():
    r = await get_data()
    print("Get_data said:", r)
    return r


# (this is approximately what is done under the hood,
#  presented here to make this a self-contained example)
asyncio.run(get_result_async())
Sign up to request clarification or add additional context in comments.

2 Comments

Do I need to lock any data in get_data function?or async will run each request in its own 'container' and I don't need to worry about making code aync/thread safe?
They're not run in isolation - if you have global state, you need to take care of it like you would with threads.
2

It's as simple as:

async def get_data():
    first, second = await asyncio.gather(
        get_data_one(),
        get_data_two(),
    )

    return first + second

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.