1

celery --version 5.1.2 (sun-harmonics)

django --version 3.2.8

I have a celery schedule that has four tasks that run in different timezones. I am using nowfun for setting the timezones and have set CELERY_ENABLE_UTC = False in settings.py. I followed the top response on this SO post: Celery beat - different time zone per task

Note that I made this change this morning - I was running a previous version of the code without these settings.

Currently, I am saving the celery results to CELERY_RESULT_BACKEND = 'django-db'.

Since implementing the change that allows for different tasks to be run according to different timezones I am getting an error when I run celery -A backend beat -l info.

It's super long though here is the head and tail: Head:

[2021-10-29 07:29:36,059: INFO/MainProcess] beat: Starting... [2021-10-29 07:29:36,067: ERROR/MainProcess] Cannot add entry 'celery.backend_cleanup' to database schedule: ValidationError(["Invalid timezone '<LocalTimezone: UTC+00>'"]). Contents: {'task': 'celery.backend_cleanup', 'schedule': <crontab: 0 4

      • (m/h/d/dM/MY)>, 'options': {'expire_seconds': 43200}}

Tail:

django.core.exceptions.ValidationError: ["Invalid timezone '<LocalTimezone: UTC+00>'"]

Celery beat hangs on this last error message and I have to kill it with ctrl + c.

I went onto celery and read their instructions about manually resetting the database when timezone-related settings change - the website says:

$ python manage.py shell

from django_celery_beat.models import

PeriodicTask PeriodicTask.objects.update(last_run_at=None)

I then found some documentation that said:

Warning: If you change the Django TIME_ZONE setting your periodic task schedule will still be based on the old timezone. To fix that you would have to reset the “last run time” for each periodic task:

from django_celery_beat.models import PeriodicTask, PeriodicTasks

PeriodicTask.objects.all().update(last_run_at=None)

PeriodicTasks.changed()

Note that this will reset the state as if the periodic tasks have never run before.

So I think what's causing the problem is exactly what it says above - I changed timezones and the schedule is still running on the old UTC timezone so I need to update it, though my schedules have run before and so when I type:

>>> PeriodicTask.objects.all().update(last_run_at=None)

I get the response:

13

and then when I enter:

>>> PeriodicTasks.changed()

I get a type error:

TypeError: changed() missing 1 required positional argument: 'instance'

So my question is:

What do I do to update the PeriodTask and PeriodicTasks? What arguments should I pass to PeriodicTasks.changed() and is 13 the expected response for the first command?

Here is my celery.py:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
from celery.schedules import crontab
import pytz
from datetime import datetime

os.environ.setdefault(
    'DJANGO_SETTINGS_MODULE',
    'backend.settings'
)

app = Celery(
    'backend'
)

app.config_from_object(
    settings,
    namespace='CELERY'
)

def uk_time():
     return datetime.now(pytz.timezone('Europe/London'))

def us_time():
    return datetime.now(pytz.timezone('EST'))

def jp_time():
    return datetime.now(pytz.timezone('Japan'))

# Celery Beat Settings
app.conf.beat_schedule={
    'generate_signals_london': {
        'task': 'signals.tasks.generate_signals',
        'schedule': crontab(
            minute=0,
            hour=8,
            nowfun=uk_time,
            day_of_week='1,2,3,4,5'
        ),
        'args': ('UK',),
    },

    'generate_signals_ny': {
        'task': 'signals.tasks.generate_signals',
        'schedule': crontab(
            minute=0,
            hour=7,
            nowfun=us_time,
            day_of_week='1,2,3,4,5'
        ),
        'args': ('NY',),
    },

    'generate_signals_nyse': {
        'task': 'signals.tasks.generate_signals',
        'schedule': crontab(
            minute=0,
            hour=9,
            nowfun=us_time,
            day_of_week='1,2,3,4,5'
        ),
        'args': ('NYSE',),
    },

    'generate_signals_asia': {
        'task': 'signals.tasks.generate_signals',
        'schedule': crontab(
            minute=0,
            hour=8,
            nowfun=jp_time,
            day_of_week='1,2,3,4,5'
        ),
        'args': ('JP',),
    },

}

app.autodiscover_tasks()

1 Answer 1

-1

When trying to create a schedule where tasks have different timezones and they depend on dst it is important to make this dynamic.

Create a task that updates the beat schedule database object

import os
from django import setup
from celery import Celery
from celery.schedules import crontab

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'api.settings')
setup()
app = Celery('api')
app.conf.timezone = 'UTC'

app.config_from_object('django.conf:settings', namespace='CELERY')
app.conf.broker_connection_retry_on_startup = True

# Register database scheduler for beat
app.conf.beat_scheduler = 'django_celery_beat.schedulers:DatabaseScheduler'

# Register our `update_beat_schedule` task to run every Sunday at 20:00 UTC
app.conf.beat_schedule = {
    'update_beat_schedule': {
        'task': 'utility_app.tasks.update_beat_schedule',
        'schedule': crontab(hour=20, minute=0, day_of_week='sun'),
        'args': ()
    },
}

app.autodiscover_tasks()

Then have the task create the schedule with everything it needs and update the PeriodicTask model. The reason I filtered it first is so that I can update instances that already exist - otherwise new instances are created instead

from django_celery_beat.models import PeriodicTask, CrontabSchedule
from celery import shared_task
import json
from pytz import timezone
from datetime import datetime
from utility_app.utils import first_business_days

class UtilsAppError(Exception):
    def __init__(self, message):
        self.message = message
        super().__init__(f"{message}")

def get_mt4_timezone():
    eastern = timezone('US/Eastern')
    is_dst = bool(eastern.localize(datetime.now()).dst())
    mt4_tz = 'Etc/GMT-3' if is_dst else 'Etc/GMT-2'
    return mt4_tz

def get_year_month_day():
    tz = timezone(get_mt4_timezone())
    current_mt4_datetime = datetime.now(tz)
    current_month = current_mt4_datetime.month
    current_year = current_mt4_datetime.year
    current_day = current_mt4_datetime.day
    return current_year, current_month, current_day

def get_day_of_month_or_week(period='month'):
    year, month, day = get_year_month_day()
    first_business_day_next_month, first_business_day_following_week = first_business_days(year, month, day)
    day_of_month = first_business_day_next_month.day
    day_of_week = first_business_day_following_week.weekday()
    return day_of_month if period == 'month' else day_of_week


@shared_task
def update_beat_schedule():
    try:
        mt4_timezone = get_mt4_timezone()
        day_of_month = get_day_of_month_or_week('month')
        day_of_week = get_day_of_month_or_week('week')

        tasks_to_update = [
            {
                'name': 'monthly_analysis', 
                'task': 'signals_app.tasks.technical_analysis', 
                'hour': 0, 
                'timezone':mt4_timezone, 
                'day_of_month': day_of_month, 
                'args': (mt4_timezone,)
            },
            {
                'name': 'weekly_analysis', 
                'task': 'signals_app.tasks.technical_analysis', 
                'hour': 0, 
                'timezone':mt4_timezone, 
                'day_of_week': day_of_week, 
                'args': (mt4_timezone,)
            },
            {
                'name': 'tokyo_bias', 
                'task': 'signals_app.tasks.process_signal_tasks', 
                'hour': 0, 
                'timezone':mt4_timezone, 
                'args': ('Tokyo', 'market_open_bias', mt4_timezone)
            },
            {
                'name': 'london_bias', 
                'task': 'signals_app.tasks.process_signal_tasks', 
                'hour': 8, 
                'timezone':mt4_timezone, 
                'args': ('London', 'market_open_bias', mt4_timezone)
            },
            {
                'name': 'ny_bias', 
                'task': 'signals_app.tasks.process_signal_tasks', 
                'hour': 12, 
                'timezone':mt4_timezone, 
                'args': ('NewYork', 'market_open_bias', mt4_timezone)
            },
            {
                'name': 'nyse_bias', 
                'task': 'signals_app.tasks.process_signal_tasks', 
                'hour': 16, 
                'timezone':mt4_timezone, 
                'args': ('NYSE', 'market_open_bias', mt4_timezone)
            },
            {
                'name': 'tokyo_market_open', 
                'task': 'signals_app.tasks.process_signal_tasks', 
                'hour': 9, 
                'timezone':'Asia/Tokyo', 
                'args': ('Tokyo', 'market_open', mt4_timezone)
            },
            {
                'name': 'london_market_open', 
                'task': 'signals_app.tasks.process_signal_tasks', 
                'hour': 8,
                'timezone':'Europe/London', 
                'args': ('London', 'market_open', mt4_timezone)
            },
            {
                'name': 'ny_market_open', 
                'task': 'signals_app.tasks.process_signal_tasks', 
                'hour': 8, 
                'timezone':'US/Eastern', 
                'args': ('NewYork', 'market_open', mt4_timezone)
            },
            {
                'name': 'nyse_market_open', 
                'task': 'signals_app.tasks.process_signal_tasks', 
                'hour': 10, 
                'timezone':'US/Eastern', 
                'args': ('NYSE', 'market_open', mt4_timezone)
            }
        ]

        for task in tasks_to_update:
            # First, try to find the PeriodicTask by name.
            periodic_task = PeriodicTask.objects.filter(name=task['name']).first()

            if periodic_task:
                # If it exists, update its CrontabSchedule
                crontab = periodic_task.crontab
                crontab.hour = task['hour']
                crontab.minute = 0
                crontab.day_of_month = task.get('day_of_month', '*')
                crontab.day_of_week = task.get('day_of_week', '*')
                crontab.timezone = task['timezone']
                crontab.save()
            else:
                # If it doesn't exist, create a new CrontabSchedule and PeriodicTask
                crontab, _ = CrontabSchedule.objects.get_or_create(
                    hour=task['hour'],
                    minute=0,
                    day_of_month=task.get('day_of_month', '*'),
                    day_of_week=task.get('day_of_week', '*'),
                    timezone=task['timezone']
                )
                PeriodicTask.objects.create(
                    name=task['name'],
                    crontab=crontab,
                    args=json.dumps(task.get('args', []))
                )

    except Exception as e:
        raise UtilsAppError(f"Error updating beat schedule for task {task['name']}: {e}")
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.