Django Celery: create periodic task at runtime with schedule depending on user input
Asked Answered
R

3

8

I have a simple Django (v3.1) app where I receive data from a form, process it with a view and then pass it on to Celery (v4.4.7, RabbitMQ as broker). Depending on the data submitted in the form, it can be a one-time task or a periodic task.

The periodic task should execute the same task as the one-time task, but, well, with a periodic schedule. I would like to pass that schedule to the task, including a start date, end date and an intervall (e.g.: execute every 2 days at 4pm, starting now until 4 weeks).

My view (shortened and renamed for illustration purposes, of course):

# views.py

if request.method == 'POST':
    form = BackupForm(request.POST)
        
    if form.is_valid():
        data = ...

        if not form.cleaned_data['periodic']: 
            # execute one-time task 
            celery_task = single_task.delay(data)

        else:
            schedule = {
                'first_backup': form.cleaned_data['first_backup'],
                'last_backup': form.cleaned_data['last_backup'],
                'intervall_every': form.cleaned_data['intervall_every'],
                'intervall_unit': form.cleaned_data['intervall_unit'],
                'intervall_time': form.cleaned_data['intervall_time'],
            }

            # execute periodic task, depending on the schedule submitted in the form
            celery_task = single_task.delay(data, schedule=schedule)

        return HttpResponseRedirect(reverse('app:index'))

The single task looks like this:

# tasks.py

@shared_task
def single_task(data: dict, **kwargs) -> None:
    asyncio.run(bulk_screen(data=data))
    # TODO: receive schedule if periodic and create a periodic task with it

This works well for the single task. However, I don't know how to adapt this to create dynamic periodic tasks. My schedule data varies, depending on the users' form input. I have to create the periodic task at runtime.

According to the official documentation on periodic tasks, crontab schedules is what I need:

from celery.schedules import crontab

app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

Although this looks fine, this sits in the celery config with hardcoded schedules.

I also read about the on_after_finalize.connect decorator where I could do something like this:

@celery_app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(10.0, task123.s('hello'))

But I don't know how to pass the schedule to this function. Also, what is the sender? Can I pass it from my view?

Then I read about populating the relevant models in celery beat here. But I guess there has to be a more elegant way, using the stable version without deprecated decorators.

Thank you.

Rattoon answered 10/11, 2020 at 13:29 Comment(1)
Did you try celery-redbeat?Landaulet
R
1

You should definitely try populating the PeriodicTask or CrontabSchedule provided with the django_celery_beat package.link to docs

Celery beat is the scheduler which runs periodically and it will simply execute all the tasks based on a schedule (a database backed one in case of django_celery_beat). Reference1, Reference2

Celery beat is certainly the cleanest way to handle periodic tasks with different schedules instead of creating your own scheduler or handle different schedules.

Razz answered 13/11, 2020 at 18:8 Comment(0)
C
0

I believe in celery 4.x you cannot set up the dynamic schedule you require (I recall seeing talk of it appearing in 5 but not sure if it has). Your code looks promising, however — you just need to check for a schedule in single_task and schedule a new task (if required) with a countdown or eta. The task can schedule its own task(s) for a future time.

e.g.

@shared_task
def single_task(data: dict, **kwargs) -> None:
    asyncio.run(bulk_screen(data=data))
    schedule = kwargs.get("schedule")
    if schedule:  # do we need to run this again?
        next_run_at = get_next_run_at(schedule)
        if next_run_at:
            # yes!
            single_task.apply_async(args=[data], kwargs=kwargs, eta=next_run_at)
        # else the task will complete without rescheduling itself

def next_run_at_schedule(schedule: dict) -> Optional[datetime]:
    """ return None if the schedule has expired, or is invalid
        else return the date and time to run the next task
    """
    pass 

See Celery Docs for apply_async and eta details. I haven't tested this but the principle is sound. You should

  • add error handling within single_task so that subsequent runs are scheduled in the event of an error within the main code
  • possibly add extra checks before doing the main task to ensure you are not running a zombie task (e.g. user has unsubscribed etc)
  • DEFINITELY ensure that the user input driving the schedule has been thoroughly validated — not just that the dates are dates etc. "Never trust user input"

If you prefer, you can use countdown rather than eta with apply_async.

Of course, it may be easier (and more reliable, as correctly performing the error handling I mentioned is not trivial) to load the schedules into a database and then run a simple celery task every minute to check for backups that need to happen — this would then fire them off in parallel via simple_task.delay(data).

Chappelka answered 19/11, 2020 at 11:24 Comment(0)
H
0

I had a similar challenge when building service for uptime monitoring. I wrote an article showing my approach for dynamic periodic tasks in Celery and created an example project (available in GitHub).

There is django-celery-beat package that provides database models for IntervalSchedule and PeriodicTask. You will need to add a model that will store the information about interval and period. The model should have one-to-one relationship with PeriodicTask:

from django.db import models
from django_celery_beat.models import PeriodicTask

class ModelWithTask(models.Model):
 
    interval = models.IntegerField(blank=False)

    period = models.IntegerField(blank=False)

    task = models.OneToOneField(
        PeriodicTask, null=True, blank=True, on_delete=models.SET_NULL
    )

You need to create task object when you create ModelWithTask object. Then you can manipulate periodic tasks as you want. For example, at the end of the task execution just check if the period time is over.

Hardhack answered 18/10, 2022 at 12:2 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.