0

I'm using Celery to consume messages from SQS queue.

The queue is Standard type.

There are cases [exceptions caught] when I explicitly re-enqueue tasks back to the queue.

    def run(self):
        try:
            # some exceptions occurred
            ...                    
        except Exception as e:
            log.error(str(e), exc_info=True)
            self.enqueue_message()
            return

    def enqueue_message(self, task='llm_extraction_task', queue='llm-extraction-queue'):
        # TODO retries mechanism
        llm_app.send_task(name=task, kwargs=self.message, queue=queue)

Messages are consumed:

@shared_task(name= "llm_extraction_task")
def check_nlp_job_status(**kwargs):
    log.info("Message received in llm_extraction_task")
    consume_llm_data_obj = ConsumeLLMData(payload=kwargs)
    consume_llm_data_obj.run()

This will immediately push the message back to the queue.

The problem is that with a fewer messages - say a SINGLE message - the same re-enqueued message is immediately consumed back.

I want to add a timer or delay to such messages so that failed messages are prioritized or picked later.

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.