2

I am trying to process a file uploaded by the user. However, I want the user to get a response once the upload is completed and terminate the connection but carry on processing the file. Therefore I am using BackgroundTasks.add_tasks and my code looks something like this:

class Line(BaseModel):
    line: str

@app.post("/foo")
async def foo(line: Line):
""" Processing line generate results"""

    ...

    result = ... # processing line.line
    print(results)
    return results

@app.post("/upload")
async def upload(background_tasks: BackgroundTasks, csv: UploadFile = File(...)):

    background_tasks.add_task(process, csv)
    return response.text("CSV has been uploaded successfully")


async def process(csv):
    """ Processing CSV and generate data"""

    tasks = [foo(line) for line in csv]
    result = await asyncio.gather(*tasks)

Unfortunately, the code above only executes one-by-one. Moreover, I have to wait till all the results are processed and then print statement in foo works, i.e. let's say I have n lines in the csv, after all n are processed is when I see the print statements for all. My program runs on 20 workers but while this process is running, it only utilizes around 1% of the CPU (foo is not a computation task, it is more of a IO/Network bound task). This makes me think that the background process is running on 1 worker only. I did try ProcessPoolExecutor as follows:

loop = asyncio.get_event_loop()
lines = [line_0, line_1, ..., line_n] # Extracted all lines from CSV
with ProcessPoolExecutor() as executor:
    results = [loop.run_in_executor(executor, lambda: foo(line)) for line in lines]
    results = loop.run_until_complete(*results)

However, I get the following error:

processpoolexecutor can't pickle local object

I did manage to get over that error by changing my approach from:

results = [loop.run_in_executor(executor, lambda: foo(line)) for line in lines]

to:

results = [asyncio.ensure_future(foo(line=Line(line)) for line in lines]

However, then I get this error:

File "uvloop/loop.pyx", line 2658, in uvloop.loop.Loop.run_in_executor AttributeError: 'Loop' object has no attribute 'submit'

To Summarize: To process one line, I can hit the "/foo" endpoint. Now, I want to process a csv of 200 lines. So first I accept the file from user and return a success message and terminate that connection. The csv is then added to a background task which should map each line to the "/foo" endpoint and give me the results for each line. However, all the approaches I have tried so far seem to be using only one thread and are processing each line one-by-one. I would like an approach where I can process multiple lines together, almost as if I am hitting the "/foo" endpoint several times simultaneously like we can using tools like Apache JMeter.

1 Answer 1

1

You could do processing in parallel without using endpoint. Below is a simplified example (without using foo endpoint) based on your code:

import asyncio
import sys
import uvicorn
from fastapi import FastAPI, BackgroundTasks, UploadFile, File
from loguru import logger


logger.remove()
logger.add(sys.stdout, colorize=True, format="<green>{time:HH:mm:ss}</green> | {level} | <level>{message}</level>")

app = FastAPI()


async def async_io_bound(line: str):
    await asyncio.sleep(3)  # Pretend this is IO operations
    return f"Line '{line}' processed"


async def process(csv):
    """ Processing CSV and generate data"""
    tasks = [async_io_bound(line) for line in csv]
    logger.info("start processing")
    result = await asyncio.gather(*tasks)
    for i in result:
        logger.info(i)


@app.post("/upload-to-process")
async def upload(background_tasks: BackgroundTasks, csv: UploadFile = File(...)):
    background_tasks.add_task(process, csv.file)
    return {"result": "CSV has been uploaded successfully"}

if __name__ == "__main__":
    uvicorn.run("app3:app", host="localhost", port=8001)

Example of output (all lines were processed in parallel) :

INFO:     ::1:52358 - "POST /upload-to-process HTTP/1.1" 200 OK
13:21:31 | INFO | start processing
13:21:34 | INFO | Line 'b'one, two\n'' processed
13:21:34 | INFO | Line 'b'0, 1\n'' processed
13:21:34 | INFO | Line 'b'1, 1\n'' processed
13:21:34 | INFO | Line 'b'2, 1\n'' processed
13:21:34 | INFO | Line 'b'3, 1\n'' processed
13:21:34 | INFO | Line 'b'4, 1\n'' processed
13:21:34 | INFO | Line 'b'5, 1\n'' processed
13:21:34 | INFO | Line 'b'6, 1\n'' processed
13:21:34 | INFO | Line 'b'7, 1\n'' processed
13:21:34 | INFO | Line 'b'8, 1\n'' processed
13:21:34 | INFO | Line 'b'9, 1\n'' processed
Sign up to request clarification or add additional context in comments.

2 Comments

I did try and implement your suggestion. However, the processes were still being run one by one. Hence, I had to resort to arq to get the processes to run in parallel. I used arq rather than celery as I have async functions. Celery does not have support for async function as of the time of this comment, it is supposed to get the support in the latest build v5 which will be released in December 2020.
It is better to see the implementation of your line processing to help you

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.