0

In my Django project I need to run celery task periodically. I use Celery 4 + Redis.

First I run redis-server, then use next commands:

$ celery -A TestProject worker -l info
$ celery -A TestProject beat -l info

First command raise next error:

The full contents of the message body was:
b'[[], {}, {"errbacks": null, "chord": null, "chain": null, "callbacks": null}]' (77b)
Traceback (most recent call last):
  File "/home/ubuntu/enjoy_jumping/lib/python3.5/site-packages/celery/worker/consumer/consumer.py", line 557, in on_task_received
    strategy = strategies[type_]
KeyError: 'profile.tasks.amount_counting'

celery.py (file in the same directory as settings.py file)

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'TestProject.settings')

app = Celery('TestProject')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

init.py: (file in the same directory as settings.py file)

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

settings.py:

CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Almaty'

# Other Celery settings
CELERY_BEAT_SCHEDULE = {
    'amount-counting': {
        'task': 'profile.tasks.amount_counting',
        'schedule': timedelta(seconds=60), 
    }
}

tasks.py: (file in the profile app's folder)

from __future__ import absolute_import, unicode_literals
from celery import task


@task()
def amount_counting():
    # Code here

1 Answer 1

2

Finally I found the solution. I edit celery.py file:

import os
import sys
from celery import Celery
from celery._state import _set_current_app
import django
from django.conf import settings


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'TestProject.settings')
app = Celery('TestProject')
app.config_from_object('django.conf:settings', namespace='CELERY')
_set_current_app(app)
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../TestProject')))
django.setup()
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.