Here’s the pattern I want:
- Dispatch multiple tasks in parallel.
- Aggregate all their results into a final result.
- Remove the intermediate results right after the chord result is ready, without blocking the execution.
What's the problem with intermediate results?
I use CouchDB as a result backend which doesn't support TTL feature. The intermediate results tend to be quite large and redundant, since only the aggregated result is needed.
Example sketch of what I want
from celery import chord
@app.task
def worker_task(arg):
return compute_something(arg)
@app.task
def aggregate(results):
return sum(results)
def cleanup(final_result):
for r in final_result.parent.results:
r.forget()
promise = chord((worker_task.s(arg) for arg in inputs))(aggregate.s())
promise.then(cleanup)
P.S., AsyncResult actually has then() method, but I couldn't figure out how to use it.
final_result = chord(...)(...). I also tested version withpromise.then(cleanup)and it works for me withRedis. You may add someprint()in functions to see when they are executed and what they get in variables. This way I see thatcleanupworks and it removes tasks. Of course later I check task directly in Redis to confirm if it really works..get()then it may not run any calculations - so you may try to remove intermedia workers before it tried to use them. The only idea - assign(worker_task.s(arg) for arg in inputs)to variable to get all IDs and use them after getting all intermedia results to local variable, and later to remove workers.pastebin.org: Python - celery - remove intermediate results from Redis - (Stackoverflow) - Pastebin.com. functionversion_1assign workers to variable and use directly its IDs to remove them.if promise.ready(): print("RESULT:", promise.get())