7

I am having problem with executing celery task from another celery task.

Here is the problematic snippet (data object already exists in database, its attributes are just updated inside finalize_data function):

def finalize_data(data):
    data = update_statistics(data)
    data.save()
    from apps.datas.tasks import optimize_data
    optimize_data.delay(data.pk)

@shared_task
def optimize_data(data_pk):
    data = Data.objects.get(pk=data_pk)
    #Do something with data

Get call in optimize_data function fails with "Data matching query does not exist."

If I call the retrieve by pk function in finalize_data function it works fine. It also works fine if I delay the celery task call for some time.

This line:

optimize_data.apply_async((data.pk,), countdown=10)

instead of

optimize_data.delay(data.pk)

works fine. But I don't want to use hacks in my code. Is it possible that .save() call is asynchronously blocking access to that row/object?

4
  • If the data object already exists then you should not get that error - if there was a block then you might expect to see a timeout if there was some misconfiguration. does using apply_async() without the countdown work? Commented Aug 26, 2015 at 14:58
  • I'm guessing your caller is inside a transaction that hasn't committed before celery starts to process the task. Hence celery can't find the record. That is why adding a countdown makes it work. Does a 1 second countdown work? I've used 1 second countdowns throughout code to deal with this issue. Other solution is to stop using transactions. Commented Aug 26, 2015 at 22:34
  • Which version of Django are you using? Commented Aug 27, 2015 at 9:55
  • @Lee good call, caller was inside a transaction that hasn't commited yet (finalize_data was called with transaction atomic), I refactored the code and everything works fine. Leave an answer, so I can mark it as resolved. Thanks everyone for help. Commented Aug 28, 2015 at 12:40

3 Answers 3

9

I know that this is an old post but I stumbled on this problem today. Lee's answer pointed me to the correct direction but I think a better solution exists today.

Using the on_commit handler provided by Django this problem can be solved without a hackish way of countdowns in the code which might not be intuitive to the user about why it exsits.

I'm not sure if this existed when the question was posted but I'm just posting the answer so that people who come here in the future know about the alternative.

Sign up to request clarification or add additional context in comments.

3 Comments

I feel this is a more proper way. However, I have a concern about this way. Basically, the code order would be: first "object.save()", then second "transaction.on_commit(func)", right? Could there be the case that the commit signal is fired before the second code is executed? In that case, the "func" in the second code won't be called.
Valid question. My answer assumes that there both lines of code are wrapped in a transaction. Which guarantees that the on_commit func will be executed when transaction is committed successfully.
Ahha, just found it's actually not a problem, the "func" will still get called as from the document: "if you call on_commit() while there isn’t an active transaction, the callback will be executed immediately." docs.djangoproject.com/en/2.2/topics/db/transactions/…
8

I'm guessing your caller is inside a transaction that hasn't committed before celery starts to process the task. Hence celery can't find the record. That is why adding a countdown makes it work.

A 1 second countdown will probably work as well as the 10 second one in your example. I've used 1 second countdowns throughout code to deal with this issue.

Another solution is to stop using transactions.

Comments

5

You could use an on_commit hook to make sure the celery task isn't triggered until after the transaction commits?

DjangoDocs#performing-actions-after-commit

It's a feature that was added in Django 1.9.

from django.db import transaction

def do_something():
    pass  # send a mail, invalidate a cache, fire off a Celery task, etc.

transaction.on_commit(do_something)

You can also wrap your function in a lambda:

transaction.on_commit(lambda: some_celery_task.delay('arg1'))

The function you pass in will be called immediately after a hypothetical database write made where on_commit() is called would be successfully committed.

If you call on_commit() while there isn’t an active transaction, the callback will be executed immediately.

If that hypothetical database write is instead rolled back (typically when an unhandled exception is raised in an atomic() block), your function will be discarded and never called.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.