0

I’m using Celery with Redis and RabbitMQ as brokers, and Flower for monitoring. I have an API endpoint to upload documents, which triggers a first Celery task to process the initial batch.

Inside this first Celery task, based on the count of sub-batches, I trigger a second Celery task to split the batch further and process the sub-batches in parallel.

The problem is:

Sometimes the second task stays in a pending state and never starts executing—no logs from the second task appear.

Other times, the second task is picked up and processed successfully by the Celery worker.

Both workers are running and show as ready in Flower.

The first task runs fine and triggers the second task.

There are no errors or warnings in the worker logs.

I start the Celery workers as follows: source /usr/src/app/Configs/environ2.sh && celery -A Application.tasks worker --pool=gevent -E --loglevel=INFO --concurrency=6

I’m struggling to find a pattern or root cause for why the second Celery task intermittently does not get executed.

Could this be related to how I trigger the second task inside the first? Or some concurrency or queue configuration issue? What debugging steps or best practices should I follow to ensure the second task always gets picked up reliably?

I tried adding first and second celery task in same queue. but this is not working. how to work with celery without failing

this is the first celery

@CELERY.task(max_retries=0) def call_upload(q_id, b_id, user_id,uploaded_zip_folders,batch_display_name,batch_display_id,e_id,method,token,kwargs):

message, return_response,batch_id = o_uploader.upload(q_id, b_id, user_id,uploaded_zip_folders,batch_display_name,batch_display_id,e_id,kwargs)

inside the upload function , there is second celery

try: ####################################################################### # Iterating through the Jobs assigned to a Batch. with ThreadPoolExecutor(max_workers=4) as executor: # Adjust the number of workers as needed futures = [] for folder_id, job_ids in folder_job_ids.items(): f = executor.submit(self.parallel_batch_process,job_ids,b_id,q_id,folder_id,kwargs,logger) f.add_done_callback( lambda fut, logger=logger: ( logger.exception("Error inside parallel_batch_process") if fut.exception() else None ) ) futures.append(f) failures = 0 for future in as_completed(futures): try:
result = future.result() if result == -2: failures += 1 logger.error(f"UID: {uid} -- A folder job failed during parallel execution") except Exception as e: failures += 1 logger.error(f"UID: {uid} -- An error occurred during parallel execution: {str(e)}")

            if failures > 0:
                return -2
                

        return 1
3
  • Please provide enough code so others can better understand or reproduce the problem. Commented May 23 at 8:13
  • @CELERY.task(max_retries=0) def call_upload(q_id, b_id, user_id,uploaded_zip_folders,batch_display_name,batch_display_id,e_id,method,token,kwargs): message, return_response,batch_id = o_uploader.upload(q_id, b_id, user_id,uploaded_zip_folders,batch_display_name,batch_display_id,e_id,kwargs) Commented May 26 at 6:32
  • Could you please reformat your text as it is impossible to understand the code... What I am looking for is how you handle the second celery task as that is most likely where the issue is. Commented May 31 at 11:25

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.