1

I'm running multiple simulations as tasks through celery (version 2.3.2) from django. The simulations get set up by another task:

In views.py:

result = setup_simulations.delay(parameters)
request.session['sim'] = result.task_id # Store main task id

In tasks.py:

@task(priority=1)
def setup_simulations(parameters):
    task_ids = []
    for i in range(number_of_simulations):
        result = run_simulation.delay(other_parameters)
        task_ids.append(result.task_id)
    return task_ids

After the initial task (setup_simulations) has finished, I try to revoke the simulation tasks as follows:

main_task_id = request.session['sim']
main_result = AsyncResult(main_task_id)
# Revoke sub tasks
from celery.task.control import revoke
for sub_task_id in main_result.get():
    sub_result = AsyncResult(sub_task_id); sub_result.revoke() # Does not work
    # revoke(sub_task_id) # Does not work neither

When I look at the output from "python manage.py celeryd -l info", the tasks get executed as if nothing had happened. Any ideas somebody what could have gone wrong?

1
  • Could the problem depend on the fact that I use Kombu as broker? (These “virtual transports” may have limited broadcast and event functionality. For example remote control commands only works with AMQP and Redis.) Commented Sep 14, 2011 at 15:31

1 Answer 1

1

As you mention in the comment, revoke is a remote control command so it's only currently supported by the amqp and redis transports.

You can accomplish this yourself by storing a revoked flag in your database, e.g:

from celery import states
from celery import task
from celery.exceptions import Ignore

from myapp.models import RevokedTasks


@task
def foo():
    if RevokedTasks.objects.filter(task_id=foo.request.id).count():
        if not foo.ignore_result:
            foo.update_state(state=states.REVOKED)
        raise Ignore()

If your task is working on some model you could even store a flag in that.

Sign up to request clarification or add additional context in comments.

5 Comments

Thank you for confirming my suspicion about Kombu. I was not sure if that was the case, because Kombu is indeed an AMQP messaging framework. Thank you very much for your proposal as well, I guess I will go with that solution then!
Is there a way to revoke a previously scheduled tasks with this workaround?
I updated the example to use Ignore, which is new in Celery 3.0.11. This makes to worker ignore the task after it returns and thus leave the state of the task alone.
@Fitoria What do you mean by previously scheduled? Any task the worker receives can be ignored this way. You can use the same technique to e.g. change the ETA/countdown of a task after it's been sent, by Ignor'ing the task and sending a new task with a new ETA. (it doesn't help if the task is already executing of course).
@asksol What's the best way to add a field on celery task table? stackoverflow.com/questions/25046200/…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.