39

I'd like to use Celery as a queue for my tasks so my web app could enqueue a task, return a response and the task will be processed meanwhile / someday / ... I build a kind of API, so I don't know what sort of tasks will be there in advance - in future, there can be tasks dealing with HTTP requests, another IO, but also CPU-consuming tasks. In respect to that, I'd like to run Celery's workers on processes, as these are universal kind of parallelism in Python.

However, I'd like to use gevent in my tasks too, so I could have a single task spawning many HTTP requests, etc. The problem is, when I do this:

from gevent import monkey
monkey.patch_all()

Celery stops to work. It starts, but no tasks can be effectively enqueued - they seem to go to the broker, but Celery worker doesn't collect them and process them. Only starts and waits. If I delete those lines and perform the task without any gevent and parallelization, everything works.

I think it could be because gevent patches also threading. So I tried

from gevent import monkey
monkey.patch_all(thread=False)

...but then Celery doesn't even start, it crashes without giving a reason (debug level of logging turned on).

Is it possible to use Celery for enqueuing tasks and gevent for doing some stuff inside a single task? How? What do I do wrong?

4
  • There may be some issues with gevent.monkey.patch_all() github.com/kennethreitz/grequests/issues/8 Commented Nov 2, 2012 at 12:05
  • Surely. Unfortunately, it's necessary to run such statement in order to use gevent with sockets etc. Commented Nov 2, 2012 at 12:15
  • Are you starting the eventloop after starting the green threads? e.g.: gevent.joinall() The biggest problem with this is cleaning up afterwards I think. AFAIK you cannot temporarily patch, the process would have to stay patched forever. Commented Nov 12, 2012 at 12:05
  • Yes, I first created greenlets and then used gevent.joinall(). Commented Nov 12, 2012 at 14:46

4 Answers 4

35

I believe the recommended way to start the task is as follows.

python manage.py celery worker -P gevent --loglevel=INFO

Gevent needs to be patched as early as possible.

Sign up to request clarification or add additional context in comments.

3 Comments

This only applies to Django projects and there was no mentioning of any Web-framework in the question.
@myusuf3 where should the patch to happen if it's as early as possible? which hook to use ?
@eligro celery handles the monkey patching internally just by specifying the pool on the command line.
14

You can run celery with multiple threads containing multiple greenlets like this:

$ celery multi start 4 -P gevent -l info -c:1-4 1000

3 Comments

Could not find any proper documentation for celery multi. How it works? I tried it, but it somehow spawns processes on background (probably unmanagable by Supervisord?).
you are also inadvertently starting multiple nodes.
I didn't find any documentation too. I found the above information here: groups.google.com/forum/?fromgroups=#!topic/celery-users/…
2

As far as I was able to learn, this is not possible. If someone finds out a better answer, I will accept it instead of this mine.

The only option is to use gevent also as a backend for Celery workers. What one have to do in order to accomplish such a thing is to add following in to the config file:

CELERYD_POOL = 'gevent'

More details about this options can be found here. More information about the gevent pool is on this page. Mind the fact that the gevent pool is still marked as experimental. I found no benchmarks available to compare processes and async gevent pool on different tasks (IO-oriented tasks, CPU-oriented tasks), but finally I realized even my CPU-bound tasks will be in fact more IO than CPU, because I use database to save results and the database connection will be a bottleneck, not the computing part. I will have no scientific tasks which would really hit the CPU.

3 Comments

Trying to understand gevent pools a bit better. Does that not help with I/O bound tasks? How exactly do they work?
Do not do this! See the note here: celery.readthedocs.org/en/latest/…. Instead use -P option on the command line as myusuf3 suggests.
At the time of the question being asked and answered -P wasn't there. I have accepted myusuf3's answer.
1

From my weird experience, Celery Beat can't work properly with workers with gevent pool (scheduled tasks are blocked and wait forever), unless you activate gevent monkey patching for Beat process.

However, celery beat doesn't support --pool=gevent or -P gevent option. The proper way to inject gevent monkey patching is to use a curstomized celery binary, such as:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from gevent import monkey
monkey.patch_all()

import re
import sys

from celery.__main__ import main

if __name__ == '__main__':
    sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
    sys.exit(main())

Save it as celery-gevent, and run Beat service as follows:

celery-gevent beat --app=proj.celery:app --loader=djcelery.loaders.DjangoLoader -f /var/log/celery/beat.log -l INFO --workdir=/my/proj --pidfile=/var/run/celery/beat.pid

In proj.celery you should also patch Django connection to avoid DatabaseError:

from __future__ import absolute_import

import os
# Set the Django settings module for the 'celery' program
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

import django
# Load Django model definitions, etc
django.setup()

from django.db import connection
# Allow thread sharing to ensure that Django database connection
# works properly with gevent.
connection.allow_thread_sharing = True

from django.conf import settings
from celery import Celery

app = Celery('proj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

(The above example works for Python 2.7.10, Celery 3.1.18, Django 1.8.2 and gevent 1.0.2)

2 Comments

Celery Beat doesn't have the -P option because it doesn't need it: there's no tasks pool. It's a lightweight process that periodically adds tasks to the pool. I'm using it just fine with gevent workers w/o any hacks like this, but then, I don't use --app or DjangoLoader, just a celery config file that lists the tasks and their schedules.
Celery does the monkey patching itself, you are doing work that is already complete: docs.celeryproject.org/en/2.2/_modules/celery/concurrency/…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.