Python Celery inconsistent cronjob timing for task scheduling with now function
Asked Answered
H

1

7

The situation

I have a celery task I am running at different timezone for each customer.

Basically, for each customer in my database, I get the timezone, and then I set up the celery task this way.

'schedule': crontab(minute=30, hour=14, nowfun=self.now_function)

Basically, what I want is the task to run at 14:30, at the customer timezone. Hence the now_function.

My now_function is just getting the current time with the customer timezone.

def now_function(self):
    """
    return the now function for the task
    this is used to compute the time a task should be scheduled for a given customer
    """
    return datetime.now(timezone(self.customer.c_timezone))

What is happening

I am getting inconsistencies in the time the task run, sometimes they run at the expected time, so let's say 14:30 in the customer time zone, if the timezone is America/Chicago it runs at 20:30 and that is my expected behavior.

Some other days, it runs at 14:30, which is just the time in UTC.

I am tracking to see if there is a pattern in the day the task run at the correct time and the day the cards run at the incorrect time.

Additional Information

I have tried this on celery 4.4.2 and 5.xx but it is still has the same behavior.

Here is my celery config.

CELERY_REDIS_SCHEDULER_URL = redis_instance_url
logger.debug("****** CELERY_REDIS_SCHEDULER_URL: ", CELERY_REDIS_SCHEDULER_URL)
logger.debug("****** environment: ", environment)
redbeat_redis_url = CELERY_REDIS_SCHEDULER_URL
broker_url = CELERY_REDIS_SCHEDULER_URL
result_backend = CELERY_REDIS_SCHEDULER_URL
task_serializer = 'pickle'
result_serializer = 'pickle'
accept_content = ['pickle']
enable_utc = False
task_track_started = True
task_send_sent_event = True

You can notice enable_utc is set to False.

  • I am using Redis instance from AWS to run my task.
  • I am using the RedBeatScheduler scheduler from this package to schedule my tasks.

If anyone has experienced this issue or can help me to reproduce it, I will be very thankful.

Other edits:

  • I have another cron for the same job at the same time but running weekly and monthly but they are working perfectly.
weekly_schedule : crontab(minute=30, hour=14, nowfun=self.now_function, day_of_week=1)
monthly_schedule : crontab(minute=30, hour=14, nowfun=self.now_function, day_of_month=1)

Sample Project

Here is a sample project on GitHub if you want to run and reproduce the issue.

Henhouse answered 21/2, 2022 at 10:16 Comment(1)
I have posted an answer. Your sample project has logical issues with beat_schedule.update() overwriting schedules since the task is non-unique, but my answer can be shown to work correctly with single time_period and customer.Eudoxia
E
3

RedBeat's encoder and decoder don't support nowfun.
Source code: https://github.com/sibson/redbeat/blob/e6d72e2/redbeat/decoder.py#L94-L102
The behaviour you see was described previously: sibson/redbeat#192 (comment 756397651)

You can subclass and replace RedBeatJSONDecoder and RedBeatJSONEncoder.

Since nowfun has to be JSON serializable, we can only support some special cases,
e.g. nowfun=partial(datetime.now, tz=pytz.timezone(self.customer.c_timezone))

from datetime import datetime
from functools import partial

from celery.schedules import crontab
import pytz
from pytz.tzinfo import DstTzInfo
from redbeat.decoder import RedBeatJSONDecoder, RedBeatJSONEncoder


class CustomJSONDecoder(RedBeatJSONDecoder):
    def dict_to_object(self, d):
        if '__type__' not in d:
            return d

        objtype = d.pop('__type__')

        if objtype == 'crontab':
            if d.get('nowfun', {}).get('keywords', {}).get('zone'):
                d['nowfun'] = partial(datetime.now, tz=pytz.timezone(d.pop('nowfun')['keywords']['zone']))
            return crontab(**d)

        d['__type__'] = objtype

        return super().dict_to_object(d)


class CustomJSONEncoder(RedBeatJSONEncoder):
    def default(self, obj):
        if isinstance(obj, crontab):
            d = super().default(obj)
            if 'nowfun' not in d and isinstance(obj.nowfun, partial) and obj.nowfun.func == datetime.now:
                zone = None
                if obj.nowfun.args and isinstance(obj.nowfun.args[0], DstTzInfo):
                    zone = obj.nowfun.args[0].zone
                elif isinstance(obj.nowfun.keywords.get('tz'), DstTzInfo):
                    zone = obj.nowfun.keywords['tz'].zone
                if zone:
                    d['nowfun'] = {'keywords': {'zone': zone}}
            return d

        return super().default(obj)

Replace the classes in redbeat.schedulers:

from redbeat import schedulers

schedulers.RedBeatJSONDecoder = CustomJSONDecoder
schedulers.RedBeatJSONEncoder = CustomJSONEncoder
Eudoxia answered 7/3, 2022 at 18:37 Comment(4)
Thank you, I knew it was something to do with Readbeat.I will try it tomorrow and let you how it goes.Henhouse
Yeah, I will definitely mark it as an answer, I haven't yet got the time to test it. I will mostly do it on Thursday when I will be working on the project. But thanks you again.Henhouse
Hey @aaron, this seems to not have fixed my issue.. I am checking if there may be anything I did wrong while implementing it on our codebase. I will let you knowHenhouse
Sure. I can take a look at your sample project again after you update it.Eudoxia

© 2022 - 2024 — McMap. All rights reserved.