13

i looked at celery documentation and trying something from it but it not work like the example. maybe i'm wrong at some point, please give me some pointer if i'm wrong about the following code

in views.py i have something like this:

class Something(CreateView):
  model = something

  def form_valid(self, form):
    obj = form.save(commit=False)
    number = 5
    test_limit = datetime.now() + timedelta(minutes=5)
    testing_something.apply_async((obj, number), eta=test_limit)
    obj.save()

and in celery tasks i wrote something like this:

@shared_task()
def add_number(obj, number):
    base = Base.objects.get(id=1)
    base.add = base.number + number
    base.save()
return obj

my condition with this code is the celery runs immediately after CreateView runs, my goal is to run the task add_number once in 5 minutes after running Something CreateView. Thank You so much

Edit:

  1. i've tried change the eta into countdown=180 but it still running function add_number immediately. i also tried longer countdown but still running immediately
  2. i've tried @johnmoustafis answer but still the same, the task run immediately
  3. i've also tried @dana answer but it still the same, the task run immediately

4 Answers 4

8

Celery by default uses UTC time.
If your timezone is "behind" the UTC (UTC - HH:MM) the datetime.now() call will return a timestamp which is "behind" UTC, thus causing your task to be executed immediately.

You can use datetime.utcnow() instead:

test_limit = datetime.utcnow() + timedelta(minutes=5)

Since you are using django, there exist another option:

If you have set the USE_TZ = True in your setting.py, you have enabled the django timezone settings and you can use timezone.now() instead of datetime.utcnow():

from django.utils import timezone

...

test_limit = timezone.now() + timedelta(minutes=5)
Sign up to request clarification or add additional context in comments.

6 Comments

i've tried the timezone.now() and datetime.utcnow() even i tried using my_date = datetime.now(pytz.timezone('US/Pacific')) just now but still the same
@knightzoid Have you set the appropriate timezone in django.settings? docs.djangoproject.com/en/1.11/ref/settings/…
yes i set the timezone in settings.py TIME_ZONE = 'US/Pacific'
i've done some debugging on terminal using pdb so i can see when exactly the time_limit are, but it's show the correct time for me
@knightzoid try replacing eta with countdown=time_in_seconds to check if this is working with the expected delay or the problem is elsewhere. Also try to set CELERY_ALWAYS_EAGER=False and retry with timezone.now() etc.
|
1

'test_limit' variable hasn't got timezone information. So Celery will understand eta param as UTC time.

Please use modified code:

class Something(CreateView):
    model = something

    def form_valid(self, form):
        obj = form.save(commit=False)
        number = 5

        test_limit = datetime.now()
        test_limit = test_limit.replace(tzinfo=tz.tzlocal())
        test_limit = test_limit + timedelta(minutes=5)

        testing_something.apply_async((obj, number), eta=test_limit)
        obj.save()

Comments

0

You might have the CELERY_ALWAYS_EAGER=True setting.

Could you also post your configuration and the Celery version you are using?

Here you might find some useful information.

3 Comments

i change CELERY_ALWAYS_EAGER to False but still did the same. i use celery==3.1.19 and here's my celery settings on settings.py CELERY_IMPORTS = ("somewhere.utils.tasks", ) CELERYBEAT_SCHEDULE = { 'send-post-office-mails': { 'task': 'somewhere.utils.tasks.send_post_office_mails', 'schedule': crontab(), }, 'rebuild-index': { 'task': 'somewhere.utils.tasks.rebuild_search_index', 'schedule': crontab(minute=0, hour='*'), }, } CELERY_ALWAYS_EAGER = False
Some issues I see with your settings is that you don't define a BROKER_URL and that might be the problem. Following this guide might help you.
i'm using redis as my broker and i put this also in my settings.py BROKER_URL = env("DJANGO_BROKER_URL", default='redis://localhost:6379/0')
0

I was facing the same issue with celery version 5.1.0 and I got to know that the celery config "CELERY_ALWAYS_EAGER" name has been changed to "CELERY_TASK_ALWAYS_EAGER" in version 4.0+.

So make sure you have set CELERY_TASK_ALWAYS_EAGER=False if you are using celery version 4.0+

Celery task with apply_async method should execute with specified delay in eta or countdown and both should work according to apply_async definition

    def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
                    link=None, link_error=None, shadow=None, **options):
        """Apply tasks asynchronously by sending a message.

        Arguments:
            args (Tuple): The positional arguments to pass on to the task.

            kwargs (Dict): The keyword arguments to pass on to the task.

            countdown (float): Number of seconds into the future that the
                task should execute.  Defaults to immediate execution.

            eta (~datetime.datetime): Absolute time and date of when the task
                should be executed.  May not be specified if `countdown`
                is also supplied.

            expires (float, ~datetime.datetime): Datetime or
                seconds in the future for the task should expire.
                The task won't be executed after the expiration time.

Following code works fine for me.

item/tasks.py

@app.task
def delete_item(item_pk):
    try:
        item = Item.objects.get(pk=item_pk)
        item.delete()
    except ObjectDoesNotExist:
        logging.warning(f"Cannot find item with id: {item_pk}.")

Now you can call this function in your logic in the following ways:

...

delete_item.apply_async((item.pk,), countdown=60)  # execute after 1 minute
...

(or)

from datetime import datetime, timedelta

...
eta = datetime.now() + timedelta(seconds=60)
delete_item.apply_async((item.pk,), eta=eta)  # execute after 1 minute
...

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.