1

Here’s the pattern I want:

  1. Dispatch multiple tasks in parallel.
  2. Aggregate all their results into a final result.
  3. Remove the intermediate results right after the chord result is ready, without blocking the execution.

What's the problem with intermediate results?

I use CouchDB as a result backend which doesn't support TTL feature. The intermediate results tend to be quite large and redundant, since only the aggregated result is needed.


Example sketch of what I want

from celery import chord

@app.task
def worker_task(arg):
    return compute_something(arg)

@app.task
def aggregate(results):
    return sum(results)

def cleanup(final_result):
    for r in final_result.parent.results:
        r.forget()

promise = chord((worker_task.s(arg) for arg in inputs))(aggregate.s())

promise.then(cleanup)

P.S., AsyncResult actually has then() method, but I couldn't figure out how to use it.


9
  • 1
    frankly I don't understand why intermedia results are problems. Simply don't use them if you don't need them later - simply ignore them. Or maybe delete them manually if you realy don't need them. Commented Aug 13 at 11:25
  • 1
    I run it directly after final_result = chord(...)(...). I also tested version with promise.then(cleanup) and it works for me with Redis. You may add some print() in functions to see when they are executed and what they get in variables. This way I see that cleanup works and it removes tasks. Of course later I check task directly in Redis to confirm if it really works. Commented Aug 14 at 12:08
  • 1
    I'm not sure (I didn't test it) but Celery can be "lazy" and if you don't run .get() then it may not run any calculations - so you may try to remove intermedia workers before it tried to use them. The only idea - assign(worker_task.s(arg) for arg in inputs) to variable to get all IDs and use them after getting all intermedia results to local variable, and later to remove workers. Commented Aug 14 at 13:07
  • 1
    I put my testing code on pastebin.org: Python - celery - remove intermediate results from Redis - (Stackoverflow) - Pastebin.com. function version_1 assign workers to variable and use directly its IDs to remove them. Commented Aug 14 at 13:11
  • 1
    Thank you for providing examples! I figured we could poll task without blocking like this: if promise.ready(): print("RESULT:", promise.get()) Commented Aug 14 at 13:21

1 Answer 1

1

This is a common issue when results are large. What I do in these situations is use a cache server or database to store results until they are collected. So, your chord member tasks return IDs instead of actualy results. However, they will be responsible of storing the actual result in the persistance layer. Then the chord callback will get the IDs, grab the large objects from cache or database or wherever you stored the results (can even be S3), and perform cleanup. Cleaning part of this function can safely delete large objects from the persistence layer. Problem however is if it fails in the middle or before cleanup. So, you will need an external cleanup service which is trival to implement. At the vary basic level it can delete objects older than a week or two.

I use the same for parameters. If parameters are too large (when serialised) it is often much better to pass IDs to your tasks that will, when executed, load the actual data from persistence layer (cache, DB, object storage).

General rule of thumb: make your function parameters and return values as small as possible. This is particularly important for tasks with very short execution times that are typically executed thousands even million times per day.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.