Celery beat: Change to individual timezone tasks causing validation error "Invalid timezone"
Asked Answered
C

1

1

celery --version 5.1.2 (sun-harmonics)

django --version 3.2.8

I have a celery schedule that has four tasks that run in different timezones. I am using nowfun for setting the timezones and have set CELERY_ENABLE_UTC = False in settings.py. I followed the top response on this SO post: Celery beat - different time zone per task

Note that I made this change this morning - I was running a previous version of the code without these settings.

Currently, I am saving the celery results to CELERY_RESULT_BACKEND = 'django-db'.

Since implementing the change that allows for different tasks to be run according to different timezones I am getting an error when I run celery -A backend beat -l info.

It's super long though here is the head and tail: Head:

[2021-10-29 07:29:36,059: INFO/MainProcess] beat: Starting... [2021-10-29 07:29:36,067: ERROR/MainProcess] Cannot add entry 'celery.backend_cleanup' to database schedule: ValidationError(["Invalid timezone '<LocalTimezone: UTC+00>'"]). Contents: {'task': 'celery.backend_cleanup', 'schedule': <crontab: 0 4

      • (m/h/d/dM/MY)>, 'options': {'expire_seconds': 43200}}

Tail:

django.core.exceptions.ValidationError: ["Invalid timezone '<LocalTimezone: UTC+00>'"]

Celery beat hangs on this last error message and I have to kill it with ctrl + c.

I went onto celery and read their instructions about manually resetting the database when timezone-related settings change - the website says:

$ python manage.py shell

from django_celery_beat.models import

PeriodicTask PeriodicTask.objects.update(last_run_at=None)

I then found some documentation that said:

Warning: If you change the Django TIME_ZONE setting your periodic task schedule will still be based on the old timezone. To fix that you would have to reset the “last run time” for each periodic task:

from django_celery_beat.models import PeriodicTask, PeriodicTasks

PeriodicTask.objects.all().update(last_run_at=None)

PeriodicTasks.changed()

Note that this will reset the state as if the periodic tasks have never run before.

So I think what's causing the problem is exactly what it says above - I changed timezones and the schedule is still running on the old UTC timezone so I need to update it, though my schedules have run before and so when I type:

>>> PeriodicTask.objects.all().update(last_run_at=None)

I get the response:

13

and then when I enter:

>>> PeriodicTasks.changed()

I get a type error:

TypeError: changed() missing 1 required positional argument: 'instance'

So my question is:

What do I do to update the PeriodTask and PeriodicTasks? What arguments should I pass to PeriodicTasks.changed() and is 13 the expected response for the first command?

Here is my celery.py:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
from celery.schedules import crontab
import pytz
from datetime import datetime

os.environ.setdefault(
    'DJANGO_SETTINGS_MODULE',
    'backend.settings'
)

app = Celery(
    'backend'
)

app.config_from_object(
    settings,
    namespace='CELERY'
)

def uk_time():
     return datetime.now(pytz.timezone('Europe/London'))

def us_time():
    return datetime.now(pytz.timezone('EST'))

def jp_time():
    return datetime.now(pytz.timezone('Japan'))

# Celery Beat Settings
app.conf.beat_schedule={
    'generate_signals_london': {
        'task': 'signals.tasks.generate_signals',
        'schedule': crontab(
            minute=0,
            hour=8,
            nowfun=uk_time,
            day_of_week='1,2,3,4,5'
        ),
        'args': ('UK',),
    },

    'generate_signals_ny': {
        'task': 'signals.tasks.generate_signals',
        'schedule': crontab(
            minute=0,
            hour=7,
            nowfun=us_time,
            day_of_week='1,2,3,4,5'
        ),
        'args': ('NY',),
    },

    'generate_signals_nyse': {
        'task': 'signals.tasks.generate_signals',
        'schedule': crontab(
            minute=0,
            hour=9,
            nowfun=us_time,
            day_of_week='1,2,3,4,5'
        ),
        'args': ('NYSE',),
    },

    'generate_signals_asia': {
        'task': 'signals.tasks.generate_signals',
        'schedule': crontab(
            minute=0,
            hour=8,
            nowfun=jp_time,
            day_of_week='1,2,3,4,5'
        ),
        'args': ('JP',),
    },

}

app.autodiscover_tasks()
Credendum answered 29/10, 2021 at 8:30 Comment(0)
C
-1

When trying to create a schedule where tasks have different timezones and they depend on dst it is important to make this dynamic.

Create a task that updates the beat schedule database object

import os
from django import setup
from celery import Celery
from celery.schedules import crontab

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'api.settings')
setup()
app = Celery('api')
app.conf.timezone = 'UTC'

app.config_from_object('django.conf:settings', namespace='CELERY')
app.conf.broker_connection_retry_on_startup = True

# Register database scheduler for beat
app.conf.beat_scheduler = 'django_celery_beat.schedulers:DatabaseScheduler'

# Register our `update_beat_schedule` task to run every Sunday at 20:00 UTC
app.conf.beat_schedule = {
    'update_beat_schedule': {
        'task': 'utility_app.tasks.update_beat_schedule',
        'schedule': crontab(hour=20, minute=0, day_of_week='sun'),
        'args': ()
    },
}

app.autodiscover_tasks()

Then have the task create the schedule with everything it needs and update the PeriodicTask model. The reason I filtered it first is so that I can update instances that already exist - otherwise new instances are created instead

from django_celery_beat.models import PeriodicTask, CrontabSchedule
from celery import shared_task
import json
from pytz import timezone
from datetime import datetime
from utility_app.utils import first_business_days

class UtilsAppError(Exception):
    def __init__(self, message):
        self.message = message
        super().__init__(f"{message}")

def get_mt4_timezone():
    eastern = timezone('US/Eastern')
    is_dst = bool(eastern.localize(datetime.now()).dst())
    mt4_tz = 'Etc/GMT-3' if is_dst else 'Etc/GMT-2'
    return mt4_tz

def get_year_month_day():
    tz = timezone(get_mt4_timezone())
    current_mt4_datetime = datetime.now(tz)
    current_month = current_mt4_datetime.month
    current_year = current_mt4_datetime.year
    current_day = current_mt4_datetime.day
    return current_year, current_month, current_day

def get_day_of_month_or_week(period='month'):
    year, month, day = get_year_month_day()
    first_business_day_next_month, first_business_day_following_week = first_business_days(year, month, day)
    day_of_month = first_business_day_next_month.day
    day_of_week = first_business_day_following_week.weekday()
    return day_of_month if period == 'month' else day_of_week


@shared_task
def update_beat_schedule():
    try:
        mt4_timezone = get_mt4_timezone()
        day_of_month = get_day_of_month_or_week('month')
        day_of_week = get_day_of_month_or_week('week')

        tasks_to_update = [
            {
                'name': 'monthly_analysis', 
                'task': 'signals_app.tasks.technical_analysis', 
                'hour': 0, 
                'timezone':mt4_timezone, 
                'day_of_month': day_of_month, 
                'args': (mt4_timezone,)
            },
            {
                'name': 'weekly_analysis', 
                'task': 'signals_app.tasks.technical_analysis', 
                'hour': 0, 
                'timezone':mt4_timezone, 
                'day_of_week': day_of_week, 
                'args': (mt4_timezone,)
            },
            {
                'name': 'tokyo_bias', 
                'task': 'signals_app.tasks.process_signal_tasks', 
                'hour': 0, 
                'timezone':mt4_timezone, 
                'args': ('Tokyo', 'market_open_bias', mt4_timezone)
            },
            {
                'name': 'london_bias', 
                'task': 'signals_app.tasks.process_signal_tasks', 
                'hour': 8, 
                'timezone':mt4_timezone, 
                'args': ('London', 'market_open_bias', mt4_timezone)
            },
            {
                'name': 'ny_bias', 
                'task': 'signals_app.tasks.process_signal_tasks', 
                'hour': 12, 
                'timezone':mt4_timezone, 
                'args': ('NewYork', 'market_open_bias', mt4_timezone)
            },
            {
                'name': 'nyse_bias', 
                'task': 'signals_app.tasks.process_signal_tasks', 
                'hour': 16, 
                'timezone':mt4_timezone, 
                'args': ('NYSE', 'market_open_bias', mt4_timezone)
            },
            {
                'name': 'tokyo_market_open', 
                'task': 'signals_app.tasks.process_signal_tasks', 
                'hour': 9, 
                'timezone':'Asia/Tokyo', 
                'args': ('Tokyo', 'market_open', mt4_timezone)
            },
            {
                'name': 'london_market_open', 
                'task': 'signals_app.tasks.process_signal_tasks', 
                'hour': 8,
                'timezone':'Europe/London', 
                'args': ('London', 'market_open', mt4_timezone)
            },
            {
                'name': 'ny_market_open', 
                'task': 'signals_app.tasks.process_signal_tasks', 
                'hour': 8, 
                'timezone':'US/Eastern', 
                'args': ('NewYork', 'market_open', mt4_timezone)
            },
            {
                'name': 'nyse_market_open', 
                'task': 'signals_app.tasks.process_signal_tasks', 
                'hour': 10, 
                'timezone':'US/Eastern', 
                'args': ('NYSE', 'market_open', mt4_timezone)
            }
        ]

        for task in tasks_to_update:
            # First, try to find the PeriodicTask by name.
            periodic_task = PeriodicTask.objects.filter(name=task['name']).first()

            if periodic_task:
                # If it exists, update its CrontabSchedule
                crontab = periodic_task.crontab
                crontab.hour = task['hour']
                crontab.minute = 0
                crontab.day_of_month = task.get('day_of_month', '*')
                crontab.day_of_week = task.get('day_of_week', '*')
                crontab.timezone = task['timezone']
                crontab.save()
            else:
                # If it doesn't exist, create a new CrontabSchedule and PeriodicTask
                crontab, _ = CrontabSchedule.objects.get_or_create(
                    hour=task['hour'],
                    minute=0,
                    day_of_month=task.get('day_of_month', '*'),
                    day_of_week=task.get('day_of_week', '*'),
                    timezone=task['timezone']
                )
                PeriodicTask.objects.create(
                    name=task['name'],
                    crontab=crontab,
                    args=json.dumps(task.get('args', []))
                )

    except Exception as e:
        raise UtilsAppError(f"Error updating beat schedule for task {task['name']}: {e}")
Credendum answered 29/10, 2021 at 9:44 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.