I am trying to process a file uploaded by the user. However, I want the user to get a response once the upload is completed and terminate the connection but carry on processing the file. Therefore I am using BackgroundTasks.add_tasks and my code looks something like this:
class Line(BaseModel):
line: str
@app.post("/foo")
async def foo(line: Line):
""" Processing line generate results"""
...
result = ... # processing line.line
print(results)
return results
@app.post("/upload")
async def upload(background_tasks: BackgroundTasks, csv: UploadFile = File(...)):
background_tasks.add_task(process, csv)
return response.text("CSV has been uploaded successfully")
async def process(csv):
""" Processing CSV and generate data"""
tasks = [foo(line) for line in csv]
result = await asyncio.gather(*tasks)
Unfortunately, the code above only executes one-by-one. Moreover, I have to wait till all the results are processed and then print statement in foo works, i.e. let's say I have n lines in the csv, after all n are processed is when I see the print statements for all. My program runs on 20 workers but while this process is running, it only utilizes around 1% of the CPU (foo is not a computation task, it is more of a IO/Network bound task). This makes me think that the background process is running on 1 worker only. I did try ProcessPoolExecutor as follows:
loop = asyncio.get_event_loop()
lines = [line_0, line_1, ..., line_n] # Extracted all lines from CSV
with ProcessPoolExecutor() as executor:
results = [loop.run_in_executor(executor, lambda: foo(line)) for line in lines]
results = loop.run_until_complete(*results)
However, I get the following error:
processpoolexecutor can't pickle local object
I did manage to get over that error by changing my approach from:
results = [loop.run_in_executor(executor, lambda: foo(line)) for line in lines]
to:
results = [asyncio.ensure_future(foo(line=Line(line)) for line in lines]
However, then I get this error:
File "uvloop/loop.pyx", line 2658, in uvloop.loop.Loop.run_in_executor AttributeError: 'Loop' object has no attribute 'submit'
To Summarize: To process one line, I can hit the "/foo" endpoint. Now, I want to process a csv of 200 lines. So first I accept the file from user and return a success message and terminate that connection. The csv is then added to a background task which should map each line to the "/foo" endpoint and give me the results for each line. However, all the approaches I have tried so far seem to be using only one thread and are processing each line one-by-one. I would like an approach where I can process multiple lines together, almost as if I am hitting the "/foo" endpoint several times simultaneously like we can using tools like Apache JMeter.