6

I configured my project, referring to this answer: How to use Flask-SQLAlchemy in a Celery task

My extension.py file:

import flask
from flask.ext.sqlalchemy import SQLAlchemy
from config import BaseConfig
from celery import Celery
from flask_mail import Mail
from celery.schedules import crontab


class FlaskCelery(Celery):

    def __init__(self, *args, **kwargs):

        super(FlaskCelery, self).__init__(*args, **kwargs)
        self.patch_task()

        if 'app' in kwargs:
            self.init_app(kwargs['app'])

    def patch_task(self):
        TaskBase = self.Task
        _celery = self

        class ContextTask(TaskBase):
            abstract = True

            def __call__(self, *args, **kwargs):
                if flask.has_app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
                else:
                    with _celery.app.app_context():
                        return TaskBase.__call__(self, *args, **kwargs)

        self.Task = ContextTask

    def init_app(self, app):
        self.app = app
        self.config_from_object(app.config)


mail = Mail()
db = SQLAlchemy()
settings = BaseConfig()
celery = FlaskCelery()

Then in my app_settings.py, I created this app:

app = Flask('app', instance_relative_config=True)

And configured celery:

celery.init_app(app)

I ran the flask project with python manage.py run:

app.run(
        debug=settings.get('DEBUG', False),
        host=settings.get('HOST', '127.0.0.1'),
        port=settings.get('PORT', 5000)
    )

And ran celery:

celery -A manage.celery worker --beat -l debug

Celery log looks good:

[tasks]
  . app.api.tasks.spin_file
  . app.main.tasks.send_async_email
  . celery.backend_cleanup
  . celery.chain
  ...

Then in views.py, I call this task:

send_async_email.delay(*args, **kwargs)

But all tasks are being ignored by Celery. Nothing happens, no errors, no warnings. Nothing. What am I doing wrong?

EDIT: When I start celery with this command: celery -A manage.celery worker --beat -l debug

I get the following warning:

[2015-09-21 10:04:32,220: WARNING/MainProcess] /home/.virtualenvs/myproject/local/lib/python2.7/site-packages/celery/app/control.py:36: DuplicateNodenameWarning: Received multiple replies from node name: 'name'.
Please make sure you give each node a unique nodename using the `-n` option.
  pluralize(len(dupes), 'name'), ', '.join(sorted(dupes)),
0

1 Answer 1

3

I am not sure that this will help you but I am using this code on many of my projects when ever I need celery:

from flask import Flask, request, jsonify as jsn
from celery import Celery
app = Flask(__name__)
app.config.update(dict(
    SECRET_KEY='blabla'
    )
)
# Celery configuration
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'database'
app.config['CELERY_RESULT_DBURI'] = 'sqlite:///temp.db'
app.config['CELERY_TRACK_STARTED'] = True
app.config['CELERY_SEND_EVENTS'] = True

# Initialize Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task
def do_something(data):

    from celery import current_task
    import os
    import subprocess
    with app.app_context():
        #run some bash script with some params in my case

And then I am running celery with supervisor via:

#!/bin/bash
cd /project/location && . venv/bin/activate && celery worker -A appname.celery --loglevel=info --purge #appname is my main flask file

And of course in my route i have somthing like

@app.route('/someroute', methods=["POST"])
def someroute():
    result = do_something.delay(data)
    print result.id
Sign up to request clarification or add additional context in comments.

2 Comments

Man , nothing works on my side, tried to follow your approach. Celery worker and heartbeat runs , but nothing is stored in the database
When I use the example above it works fine. However, if I run this in my own code, I require redis (ImportError: Missing redis library (pip install redis)). Why does this occur?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.