3

Based on these examples:

https://blog.balthazar-rouberol.com/celery-best-practices

https://shulhi.com/2015/10/13/class-based-celery-task/

Class based Task in django celery

I would like to create something similar:

class CalculationWorker(Task):

    def __init__(self, id_user):
        self._id_user = id_user
        self._user = get_object_or_404(User, pk=self._id_user)

    # I need to understand if the bind work or if it's needed
    def _bind(self, app):
        return super(self.__class__, self).bind(celery_app)

    def _retrieve_some_task(self):
        # long calculation

    def _long_run_task(self):
        # long calculation
        self._retrieve_some_task()

    # Main entry
    def run(self):
        self._long_run_task()


# 
def run_job():  
    worker = CalculationWorker(id_user=323232)
    task = worker.apply_async()

The documentation seems to say it's possible (anyway it's not clear to me) http://docs.celeryproject.org/en/latest/userguide/tasks.html#custom-task-classes. It even says:

""" This means that the init constructor will only be called once per process, and that the task class is semantically closer to an Actor. ""

but http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#the-task-base-class-no-longer-automatically-register-tasks explictly says: "The best practice is to use custom task classes only for overriding general behavior, and then using the task decorator to realize the task".

As result I got a NotRegistered Exception due to this https://github.com/celery/celery/issues/3548, but adding app.tasks.register(CalculationWorker()) didn't solve it. I'm using Django 1.10.X and Celery 4.0.0

Is that approach still valid?

Thanks

2 Answers 2

9

If you use celery-4.0.1, then you should check documentation that chris 1 pointed out docs

The Task class is no longer using a special meta-class that automatically registers the task in the task registry.

Now you should register your task like this

class CustomTask(Task):
    def run(self):
        print('running')
app.register_task(CustomTask())
Sign up to request clarification or add additional context in comments.

2 Comments

I did the way that you show, do you know how to add attributes like timeout with this manually register_tasks?
@Leow yep, check class Task in Celery, you will see that it's just class attributes that are filled from config in case they are None
8

I'm not sure if this is the best solution, but you can use a workaround to get the behavior you want. I'm doing something similar so that I can stick an error handler on the task in a clearer way.

From the docs 1

The best practice is to use custom task classes only for overriding general behavior, and then using the task decorator to realize the task:

@app.task(bind=True, base=CustomTask) 
def custom(self):
    print('running')

But you can put all of your task code inside CustomTask and just leave a stub in the decorated task declaration. You do have to call into the superclass of your task in the stub like so:

@app.task(bin=True, base=CustomTask)
def custom(self, *args):
    super(type(self), self).run(*args)

You then treat the decorated function declaration as a way to call into celery's task registration machinery. I hope something cleaner comes out in 5.0 though.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.