python celery - how to add CELERYBEAT_SCHEDULE task at runtime to a worker?
Asked Answered
C

2

6

I have created a celery worker with a single celerybeat schedule task which runs at 5 seconds time interval. How can I add another beat task dynamically to the celery worker without stopping it?

Example

app.conf.update(
   CELERY_TASK_RESULT_EXPIRES=3600,
   CELERY_TIMEZONE = 'UTC',
   CELERYBEAT_SCHEDULE = {
    'long-run-5-secs': {
        'task': 'test_proj.tasks.test',
        'schedule': timedelta(seconds=5),
        'args': (16, )
    }
   }
)

With the above configuration, I am able to run the celery worker with beat mode successfully.

Now I need add the below beat schedule dynamically:

'long-run-2-secs': {
    'task': 'test_proj.tasks.test',
    'schedule': timedelta(seconds=2),
    'args': (14, ) },

Thanks

Cothran answered 13/11, 2015 at 3:28 Comment(0)
B
1

I've been looking for solution for the very same problem. I am affraid you'll have to wait for Celery ver.4.0. Dynamic task scheduling is only currently supported in the development version : http://docs.celeryproject.org/en/master/userguide/periodic-tasks.html#beat-entries

Begga answered 20/5, 2016 at 9:38 Comment(0)
D
0

One possible way is to store the tasks in the database and add remove tasks dynamically. You can use database backed celery beat scheduler for the same. Refer https://django-celery-beat.readthedocs.io/en/latest/. PeriodicTask database store the periodic tasks. You can manipulate the periodic task by using database commands (Django ORM).

This is how I handled the dynamic tasks (Create and stop tasks dynamically).

from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule

chon_schedule = CrontabSchedule.objects.create(minute='40', hour='08', day_of_week='*', day_of_month='*', month_of_year='*') # To create a cron schedule. 
schedule = IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS) # To create a schedule to run everu 10 min.
PeriodicTask.objects.create(crontab=chon_schedule, name='name_to_identify_task',task='name_of_task') # It creates a entry in the database describing that periodic task (With cron schedule).
task = PeriodicTask.objects.create(interval=schedule, name='run for every 10 min', task='for_each_ten_min', ) # It creates a periodic task with interval schedule

Whenever you update a PeriodicTask a counter in this table is also incremented, which tells the celery beat service to reload the schedule from the database.

So you don't need to restart the or kill the beat. If you want to stop a task when particular criteria met then

periodic_task = PeriodicTask.objects.get(name='run for every 10 min')
periodic_task.enabled = False
periodic_task.save()

When enabled is False then the periodic task becomes idle. You can again make it active by making enable = True.

If you no longer needs the task then you can simply delete the entry.

Diagnose answered 28/11, 2018 at 7:10 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.