I’m using Celery with Redis and RabbitMQ as brokers, and Flower for monitoring. I have an API endpoint to upload documents, which triggers a first Celery task to process the initial batch.
Inside this first Celery task, based on the count of sub-batches, I trigger a second Celery task to split the batch further and process the sub-batches in parallel.
The problem is:
Sometimes the second task stays in a pending state and never starts executing—no logs from the second task appear.
Other times, the second task is picked up and processed successfully by the Celery worker.
Both workers are running and show as ready in Flower.
The first task runs fine and triggers the second task.
There are no errors or warnings in the worker logs.
I start the Celery workers as follows: source /usr/src/app/Configs/environ2.sh && celery -A Application.tasks worker --pool=gevent -E --loglevel=INFO --concurrency=6
I’m struggling to find a pattern or root cause for why the second Celery task intermittently does not get executed.
Could this be related to how I trigger the second task inside the first? Or some concurrency or queue configuration issue? What debugging steps or best practices should I follow to ensure the second task always gets picked up reliably?
I tried adding first and second celery task in same queue. but this is not working. how to work with celery without failing
this is the first celery
@CELERY.task(max_retries=0) def call_upload(q_id, b_id, user_id,uploaded_zip_folders,batch_display_name,batch_display_id,e_id,method,token,kwargs):
message, return_response,batch_id = o_uploader.upload(q_id, b_id, user_id,uploaded_zip_folders,batch_display_name,batch_display_id,e_id,kwargs)
inside the upload function , there is second celery
try:
#######################################################################
# Iterating through the Jobs assigned to a Batch.
with ThreadPoolExecutor(max_workers=4) as executor: # Adjust the number of workers as needed
futures = []
for folder_id, job_ids in folder_job_ids.items():
f = executor.submit(self.parallel_batch_process,job_ids,b_id,q_id,folder_id,kwargs,logger)
f.add_done_callback(
lambda fut, logger=logger: (
logger.exception("Error inside parallel_batch_process")
if fut.exception()
else None
)
)
futures.append(f)
failures = 0
for future in as_completed(futures):
try:
result = future.result()
if result == -2:
failures += 1
logger.error(f"UID: {uid} -- A folder job failed during parallel execution")
except Exception as e:
failures += 1
logger.error(f"UID: {uid} -- An error occurred during parallel execution: {str(e)}")
if failures > 0:
return -2
return 1