I am working on set up some celery queues for process some tasks. Each task hashes a value and send this task to a delicate queue based on hashed value. What I want to achieve is, to make sure messages in one celery queue can be executed sequentially.
Task looks like this
@app.task(
acks_late=True,
autoretry_for=(OperationalError,),
max_retries=5,
retry_backoff=True,
retry_backoff_max=1,
)
def handle_a_task(task):
# do something
Get a dedicated celery queue
def get_queue(task_value):
return md5(task_value)
Send task to the queue
queue = get_queue()
handle_a_task.apply_async(
kwargs={
"task": {"id": 123},
},
queue=queue,
)
In production, there are multiple celery workers. I've seen these workers would take messages from the same queue and process at the same time. What can I do to make messages in one celery queue get processed in order?
I've tried to do celery worker --prefetch-multiplier=1, but it doesn't work.