We have Python 3.6.1 set up with Django, Celery, and Rabbitmq on Ubuntu 14.04. Right now, I'm using the Django debug server (for dev and Apache isn't working). My current problem is that the celery workers get launched from Python and immediately die -- processes show as defunct. If I use the same command in a terminal window, the worker gets created and picks up the task if there is one waiting in the queue.
Here's the command:
celery worker --app=myapp --loglevel=info --concurrency=1 --maxtasksperchild=20 -n celery_1 -Q celery
The same functionality occurs for whichever queues are being set up.
In the terminal, we see the output myapp.settings - INFO - Loading... followed by output that describes the queue and lists the tasks. When running from Python, the last thing we see is the Loading...
In the code, we do have a check to be sure we are not running the celery command as root.
These are the Celery settings from our settings.py file:
CELERY_ACCEPT_CONTENT = ['json','pickle']
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_IMPORTS = ('api.tasks',)
CELERYD_PREFETCH_MULTIPLIER = 1
CELERYD_CONCURRENCY = 1
BROKER_POOL_LIMIT = 120 # Note: I tried this set to None but it didn't seem to make any difference
CELERYD_LOG_COLOR = False
CELERY_LOG_FORMAT = '%)asctime)s - $(processName)s - %(levelname)s - %(message)s'
CELERYD_HIJACK_ROOT_LOGGER = False
STATIC_URL = '/static/'
STATIC_ROOT = os.path.join(psconf.BASE_DIR, 'myapp_static/')
BROKER_URL = psconf.MQ_URI
CELERY_RESULT_BACKEND = 'rpc'
CELERY_RESULT_PERSISTENT = True
CELERY_ROUTES = {}
for entry in os.scandir(psconf.PLUGIN_PATH):
if not entry.is_dir() or entry.name == '__pycache__':
continue
plugin_dir = entry.name
settings_file = f'{plugin_dir}.settings'
try:
plugin_tasks = importlib.import_module(settings_file)
queue_name = plugin_tasks.QUEUENAME
except ModuleNotFoundError as e:
logging.warning(e)
except AttributeError:
logging.debug(f'The plugin {plugin_dir} will use the general worker queue.')
else:
CELERY_ROUTES[f'{plugin_dir}.tasks.run'] = {'queue': queue_name}
logging.debug(f'The plugin {plugin_dir} will use the {queue_name} queue.')
Here is the part that kicks off the worker:
class CeleryWorker(BackgroundProcess):
def __init__(self, n, q):
self.name = n
self.worker_queue = q
cmd = f'celery worker --app=myapp --loglevel=info --concurrency=1 --maxtasksperchild=20 -n {self.name" -Q {self.worker_queue}'
super().__init__(cmd, cwd=str(psconf.BASE_DIR))
class BackgroundProcess(subprocess.Popen):
def __init__(self, args, **kwargs):
super().__init__(args, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, **kwargs)
Any suggestions as to how to get this working from Python are appreciated. I'm new to Rabbitmq/Celery.
the celery workers get launched from Python and immediately die... But I don't see any Python code that is actually launching the celery worker. Could you please include that or describe this in more detail?