1

I'm using Django database instead of RabbitMQ for concurrency reasons.

But I can't solve the problem of revoking a task before it execute.

I found some answers about this matter but they don't seem complete or I can't get enough help.

How can I extend celery task table using a model, add a boolean field (revoked) to set when I don't want the task to execute?

Thanks.

1 Answer 1

4
+50

Since Celery tracks tasks by an ID, all you really need is to be able to tell which IDs have been canceled. Rather than modifying kombu internals, you can create your own table (or memcached etc) that just tracks canceled IDs, then check whether the ID for the current cancelable task is in it.

This is what the transports that support a remote revoke command do internally:

All worker nodes keeps a memory of revoked task ids, either in-memory or persistent on disk (see Persistent revokes). (from Celery docs)

When you use the django transport, you are responsible for doing this yourself. In this case it's up to each task to check whether it has been canceled.

So the basic form of your task (logging added in place of an actual operation) becomes:

from celery import shared_task
from celery.exceptions import Ignore
from celery.utils.log import get_task_logger
from .models import task_canceled
logger = get_task_logger(__name__)

@shared_task
def my_task():
    if task_canceled(my_task.request.id):
        raise Ignore
    logger.info("Doing my stuff")

You can extend & improve this in various ways, such as by creating a base CancelableTask class as in one of the other answers you linked to, but this is the basic form. What you're missing now is the model and the function to check it.

Note that the ID in this case will be a string ID like a5644f08-7d30-43ff-a61e-81c165ad9e19, not an integer. Your model can be as simple as this:

from django.db import models

class CanceledTask(models.Model):
    task_id = models.CharField(max_length=200)

def cancel_task(request_id):
    CanceledTask.objects.create(task_id=request_id)

def task_canceled(request_id):
    return CanceledTask.objects.filter(task_id=request_id).exists()

You can now check the behavior by watching your celery service's debug logs while doing things like:

my_task.delay()
models.cancel_task(my_task.delay())
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you it's working :D. With your explanation it's much clearer now :). After all it was nothing difficult. info: I use custom task_id so there is no need to manual log the operations. thanks again.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.