2

I'm trying to run a task using celery. I need to send post requests to a remote server while the user presses the send button, So I tried using celery with Redis here with this configuration in settings file:

BROKER_URL = os.environ.get("REDIS_URL")
CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL")
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Dubai'

according to documentation for apply_async I can define retry options like the code below:

__task_expiration = 60
__interval_start = 1 * 60

api_generator.apply_async(args=(*args),
                                group=user_key,
                                expires=__task_expiration,
                                retry=True,
                                retry_policy={
                                  "max_retries": 3,
                                  "interval_start": __interval_start
                                })

In documentation I found this definition for apply_async:

apply_async(args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options)

and following the documentation, I can set this using retry and retry_policy

enter image description here

and a sample code for how to define retry options

add.apply_async((2, 2), retry=True, retry_policy={
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
})

I want my task to run 3 times to run in case of any failure, and the interval between each retry to 60 seconds. my task definition looks like this:

@shared_task
def api_generator(*args):
    import requests
    import json
    url = os.environ.get("API_URL_CALL")
    api_access_key = os.environ.get("API_ACCESS_KEY")

    headers = {
        "Authorization": api_access_key,
        "Content-Type": "application/json"
    }

    json_schema = generate_json(*args)

    response = requests.request("POST", url, headers=headers, data=json.dumps(json_schema), timeout=30)

    if response.status_code != 200:
        raise NameError("API Response error")

    return response.status_code

but when my code fails, I don't see any retry mechanism in celery logs, what is the problem here? how can I define retry when calling my tasks using the apply_async method? I'm raising NameError("Exception") for telling the worker that an error has occurred.

1 Answer 1

3

[EDIT 1: Added acks_late]

There are two things that can go wrong when you send a task to a Celery worker:

  1. Connection issues with the broker and Message Queue.
  2. Exceptions raised on the worker.

The first issue can be solved by defining retry and retry_policy as you did.

The second kind (which is what you want to solve), can be solved by calling self.retry() upon a task failure.

Depending on your type of problem, it might be helpful to set CELERY_ACKS_LATE = True.

Check out these links for further information:

Retry Lost or Failed Tasks (Celery, Django and RabbitMQ)

https://coderbook.com/@marcus/how-to-automatically-retry-failed-tasks-with-celery/

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks this helped a lot, you were right. I tried using a decorator for setting retry when an exception occurs.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.