I have a simple Django (v3.1) app where I receive data from a form, process it with a view and then pass it on to Celery (v4.4.7, RabbitMQ as broker). Depending on the data submitted in the form, it can be a one-time task or a periodic task.
The periodic task should execute the same task as the one-time task, but, well, with a periodic schedule. I would like to pass that schedule to the task, including a start date, end date and an intervall (e.g.: execute every 2 days at 4pm, starting now until 4 weeks).
My view (shortened and renamed for illustration purposes, of course):
# views.py
if request.method == 'POST':
form = BackupForm(request.POST)
if form.is_valid():
data = ...
if not form.cleaned_data['periodic']:
# execute one-time task
celery_task = single_task.delay(data)
else:
schedule = {
'first_backup': form.cleaned_data['first_backup'],
'last_backup': form.cleaned_data['last_backup'],
'intervall_every': form.cleaned_data['intervall_every'],
'intervall_unit': form.cleaned_data['intervall_unit'],
'intervall_time': form.cleaned_data['intervall_time'],
}
# execute periodic task, depending on the schedule submitted in the form
celery_task = single_task.delay(data, schedule=schedule)
return HttpResponseRedirect(reverse('app:index'))
The single task looks like this:
# tasks.py
@shared_task
def single_task(data: dict, **kwargs) -> None:
asyncio.run(bulk_screen(data=data))
# TODO: receive schedule if periodic and create a periodic task with it
This works well for the single task. However, I don't know how to adapt this to create dynamic periodic tasks. My schedule data varies, depending on the users' form input. I have to create the periodic task at runtime.
According to the official documentation on periodic tasks, crontab schedules is what I need:
from celery.schedules import crontab
app.conf.beat_schedule = {
# Executes every Monday morning at 7:30 a.m.
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}
Although this looks fine, this sits in the celery config with hardcoded schedules.
I also read about the on_after_finalize.connect
decorator where I could do something like this:
@celery_app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(10.0, task123.s('hello'))
But I don't know how to pass the schedule to this function. Also, what is the sender? Can I pass it from my view?
Then I read about populating the relevant models in celery beat here. But I guess there has to be a more elegant way, using the stable version without deprecated decorators.
Thank you.