How to dynamically add / remove periodic tasks to Celery (celerybeat)
Asked Answered
S

9

64

If I have a function defined as follows:

def add(x,y):
  return x+y

Is there a way to dynamically add this function as a celery PeriodicTask and kick it off at runtime? I'd like to be able to do something like (pseudocode):

some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30"))
celery.beat.start(some_unique_task_id)

I would also want to stop or remove that task dynamically with something like (pseudocode):

celery.beat.remove_task(some_unique_task_id)

or

celery.beat.stop(some_unique_task_id)

FYI I am not using djcelery, which lets you manage periodic tasks via the django admin.

Spoken answered 17/4, 2012 at 16:13 Comment(0)
P
22

No, I'm sorry, this is not possible with the regular celerybeat.

But it's easily extensible to do what you want, e.g. the django-celery scheduler is just a subclass reading and writing the schedule to the database (with some optimizations on top).

Also you can use the django-celery scheduler even for non-Django projects.

Something like this:

  • Install django + django-celery:

    $ pip install -U django django-celery

  • Add the following settings to your celeryconfig:

    DATABASES = {
        'default': {
            'NAME': 'celerybeat.db',
            'ENGINE': 'django.db.backends.sqlite3',
        },
    }
    INSTALLED_APPS = ('djcelery', )
    
  • Create the database tables:

    $ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig
    
  • Start celerybeat with the database scheduler:

    $ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \
        -S djcelery.schedulers.DatabaseScheduler
    

Also there's the djcelerymon command which can be used for non-Django projects to start celerycam and a Django Admin webserver in the same process, you can use that to also edit your periodic tasks in a nice web interface:

   $ djcelerymon

(Note for some reason djcelerymon can't be stopped using Ctrl+C, you have to use Ctrl+Z + kill %1)

Pietra answered 19/4, 2012 at 10:0 Comment(2)
Can you please mention code to add task and remove? Sorry I am not getting.Assurgent
Any changes in this from 2012 to 2016?Clouse
R
44

This question was answered on google groups.

I AM NOT THE AUTHOR, all credit goes to Jean Mark

Here's a proper solution for this. Confirmed working, In my scenario, I sub-classed Periodic Task and created a model out of it since I can add other fields to the model as I need and also so I could add the "terminate" method. You have to set the periodic task's enabled property to False and save it before you delete it. The whole subclassing is not a must, the schedule_every method is the one that really does the work. When you're ready to terminate you task (if you didn't subclass it) you can just use PeriodicTask.objects.filter(name=...) to search for your task, disable it, then delete it.

Hope this helps!

from djcelery.models import PeriodicTask, IntervalSchedule
from datetime import datetime

class TaskScheduler(models.Model):

    periodic_task = models.ForeignKey(PeriodicTask)

    @staticmethod
    def schedule_every(task_name, period, every, args=None, kwargs=None):
    """ schedules a task by name every "every" "period". So an example call would be:
         TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) 
         that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. 
    """
        permissible_periods = ['days', 'hours', 'minutes', 'seconds']
        if period not in permissible_periods:
            raise Exception('Invalid period specified')
        # create the periodic task and the interval
        ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task
        interval_schedules = IntervalSchedule.objects.filter(period=period, every=every)
        if interval_schedules: # just check if interval schedules exist like that already and reuse em
            interval_schedule = interval_schedules[0]
        else: # create a brand new interval schedule
            interval_schedule = IntervalSchedule()
            interval_schedule.every = every # should check to make sure this is a positive int
            interval_schedule.period = period 
            interval_schedule.save()
        ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule)
        if args:
            ptask.args = args
        if kwargs:
            ptask.kwargs = kwargs
        ptask.save()
        return TaskScheduler.objects.create(periodic_task=ptask)

    def stop(self):
        """pauses the task"""
        ptask = self.periodic_task
        ptask.enabled = False
        ptask.save()

    def start(self):
        """starts the task"""
        ptask = self.periodic_task
        ptask.enabled = True
        ptask.save()

    def terminate(self):
        self.stop()
        ptask = self.periodic_task
        self.delete()
        ptask.delete()
Rabb answered 8/7, 2013 at 10:55 Comment(1)
@kai IntervalSchedule, PeriodicTask, etc, are djcelery classes, and the OP says he's not using djcelery. Definitely useful nonetheless.Teleprinter
S
31

This was finally made possible by a fix included in celery v4.1.0. Now, you just need to change the schedule entries in the database backend, and celery-beat will act according to the new schedule.

The docs vaguely describe how this works. The default scheduler for celery-beat, PersistentScheduler, uses a shelve file as its schedule database. Any changes to the beat_schedule dictionary in the PersistentScheduler instance are synced with this database (by default, every 3 minutes), and vice-versa. The docs describe how to add new entries to the beat_schedule using app.add_periodic_task. To modify an existing entry, just add a new entry with the same name. Delete an entry as you would from a dictionary: del app.conf.beat_schedule['name'].

Suppose you want to monitor and modify your celery beat schedule using an external app. Then you have several options:

  1. You can open the shelve database file and read its contents like a dictionary. Write back to this file for modifications.
  2. You can run another instance of the Celery app, and use that one to modify the shelve file as described above.
  3. You can use the custom scheduler class from django-celery-beat to store the schedule in a django-managed database, and access the entries there.
  4. You can use the scheduler from celerybeat-mongo to store the schedule in a MongoDB backend, and access the entries there.
Simonesimoneau answered 10/4, 2019 at 0:36 Comment(7)
Great solution!!Reiss
Late comment, but I don't understand how this can be done in a true dynamic fashion; i.e. after my application receives an API call, THEN make it configure the periodic task. From the code examples, it seems like it is always evaluated during function definition (with the decorator).Hanafee
For example, when I try this: _gdbm.error: [Errno 11] Resource temporarily unavailable. So it seems like while celery is running I can't seem to open the file through shelve.open(file).Hanafee
@Tristan Brown good solution, do you have any non django specific example?Solnit
I added an answer for non-django applications. See https://mcmap.net/q/153250/-how-to-dynamically-add-remove-periodic-tasks-to-celery-celerybeatRimose
The shelve module does not support concurrent read/write access to shelved objects. Options 1 and 2 will not work.Stocky
these are not production-grade solutions, shelve file will not update dynamically when a stand-alone service is using it and your Flask app tries to update it, it will throw a resource errorConfigurationism
P
22

No, I'm sorry, this is not possible with the regular celerybeat.

But it's easily extensible to do what you want, e.g. the django-celery scheduler is just a subclass reading and writing the schedule to the database (with some optimizations on top).

Also you can use the django-celery scheduler even for non-Django projects.

Something like this:

  • Install django + django-celery:

    $ pip install -U django django-celery

  • Add the following settings to your celeryconfig:

    DATABASES = {
        'default': {
            'NAME': 'celerybeat.db',
            'ENGINE': 'django.db.backends.sqlite3',
        },
    }
    INSTALLED_APPS = ('djcelery', )
    
  • Create the database tables:

    $ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig
    
  • Start celerybeat with the database scheduler:

    $ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \
        -S djcelery.schedulers.DatabaseScheduler
    

Also there's the djcelerymon command which can be used for non-Django projects to start celerycam and a Django Admin webserver in the same process, you can use that to also edit your periodic tasks in a nice web interface:

   $ djcelerymon

(Note for some reason djcelerymon can't be stopped using Ctrl+C, you have to use Ctrl+Z + kill %1)

Pietra answered 19/4, 2012 at 10:0 Comment(2)
Can you please mention code to add task and remove? Sorry I am not getting.Assurgent
Any changes in this from 2012 to 2016?Clouse
H
9

There is a library called django-celery-beat which provides the models one needs. To make it dynamically load new periodic tasks one has to create its own Scheduler.

from django_celery_beat.schedulers import DatabaseScheduler


class AutoUpdateScheduler(DatabaseScheduler):

    def tick(self, *args, **kwargs):
        if self.schedule_changed():
            print('resetting heap')
            self.sync()
            self._heap = None
            new_schedule = self.all_as_schedule()

            if new_schedule:
                to_add = new_schedule.keys() - self.schedule.keys()
                to_remove = self.schedule.keys() - new_schedule.keys()
                for key in to_add:
                    self.schedule[key] = new_schedule[key]
                for key in to_remove:
                    del self.schedule[key]

        super(AutoUpdateScheduler, self).tick(*args, **kwargs)

    @property
    def schedule(self):
        if not self._initial_read and not self._schedule:
            self._initial_read = True
            self._schedule = self.all_as_schedule()

        return self._schedule
Hyaena answered 3/5, 2017 at 6:15 Comment(1)
Thanks. Didn't work straight away but using to_add = [key for key in new_schedule.keys() if key not in self.schedule.keys()] and similar for to_remove did the trick. Why isn't this a standard option? Until now, I've had to have Celery tasks calls other Celery tasks with a countdown. That doesn't sound very good to me.Mclain
W
5

I was looking for the same solution for Celery + Redis that can be flexible add/remove. Check out this one, redbeat, same guy from Heroku, even they put as well the Redis + Sentinel.

Hope helps :)

Wilk answered 9/3, 2020 at 5:25 Comment(1)
This is an appropriate solution if you not using django and instead use redis for the celery backend.Triton
R
5

The answer from @asksol is what's needed if in a Django application.

For non-django applications, you can use celery-sqlalchemy-scheduler which is modeled like django-celery-beat for Django since it also uses database instead of the file celerybeat-schedule.

Here is an example with runtime addition of a new task.

tasks.py

from celery import Celery

celery = Celery('tasks')

beat_dburi = 'sqlite:///schedule.db'

celery.conf.update(
    {'beat_dburi': beat_dburi}
)


@celery.task
def my_task(arg1, arg2, be_careful):
    print(f"{arg1} {arg2} be_careful {be_careful}")

Logs (Producer)

$ celery --app=tasks beat --scheduler=celery_sqlalchemy_scheduler.schedulers:DatabaseScheduler --loglevel=INFO
celery beat v5.1.2 (sun-harmonics) is starting.
[2021-08-20 15:20:20,927: INFO/MainProcess] beat: Starting...

Logs (Consumer)

$ celery --app=tasks worker --queues=celery --loglevel=INFO
-------------- celery@ubuntu20 v5.1.2 (sun-harmonics)
[2021-08-20 15:20:02,287: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//

Database schedules

$ sqlite3 schedule.db 
sqlite> .databases
main: /home/nponcian/Documents/Program/1/db/schedule.db
sqlite> .tables
celery_crontab_schedule       celery_periodic_task_changed
celery_interval_schedule      celery_solar_schedule       
celery_periodic_task        
sqlite> select * from celery_periodic_task;
1|celery.backend_cleanup|celery.backend_cleanup||1||[]|{}|||||2021-08-20 19:20:20.955246|0||1||0|2021-08-20 07:20:20|

Now, while those workers are already running, let's update the schedules by adding a new scheduled task. Note that this is at runtime, without the need to restart the workers.

$ python3
>>> # Setup the session.
>>> from celery_sqlalchemy_scheduler.models import PeriodicTask, IntervalSchedule
>>> from celery_sqlalchemy_scheduler.session import SessionManager
>>> from tasks import beat_dburi
>>> session_manager = SessionManager()
>>> engine, Session = session_manager.create_session(beat_dburi)
>>> session = Session()
>>> 
>>> # Setup the schedule (executes every 10 seconds).
>>> schedule = session.query(IntervalSchedule).filter_by(every=10, period=IntervalSchedule.SECONDS).first()
>>> if not schedule:
...     schedule = IntervalSchedule(every=10, period=IntervalSchedule.SECONDS)
...     session.add(schedule)
...     session.commit()
... 
>>> 
>>> # Create the periodic task
>>> import json
>>> periodic_task = PeriodicTask(
...     interval=schedule,                  # we created this above.
...     name='My task',                     # simply describes this periodic task.
...     task='tasks.my_task',               # name of task.
...     args=json.dumps(['arg1', 'arg2']),
...     kwargs=json.dumps({
...        'be_careful': True,
...     }),
... )
>>> session.add(periodic_task)
>>> session.commit()

Database schedules (updated)

  • We can now see that the newly added schedule has reflected to the database which is continuously read by the celery beat scheduler. So should there be any updates with the values of the args or kwargs, we can easily perform SQL updates on the database and it should reflect in realtime with the running workers (without the need of restart).
sqlite> select * from celery_periodic_task;
1|celery.backend_cleanup|celery.backend_cleanup||1||[]|{}|||||2021-08-20 19:20:20.955246|0||1||0|2021-08-20 07:20:20|
2|My task|tasks.my_task|1|||["arg1", "arg2"]|{"be_careful": true}||||||0||1||0|2021-08-20 07:26:49|

Logs (Producer)

  • Now, the new task is being enqueued every 10 seconds
[2021-08-20 15:26:51,768: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2021-08-20 15:26:51,768: INFO/MainProcess] Writing entries...
[2021-08-20 15:27:01,789: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)
[2021-08-20 15:27:11,776: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)
[2021-08-20 15:27:21,791: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)

Logs (Consumer)

  • The newly added task is correctly executed on time every 10 seconds
[2021-08-20 15:27:01,797: INFO/MainProcess] Task tasks.my_task[04dcb40c-0a77-437b-a129-57eb52850a51] received
[2021-08-20 15:27:01,798: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
[2021-08-20 15:27:01,799: WARNING/ForkPoolWorker-4] 

[2021-08-20 15:27:01,799: INFO/ForkPoolWorker-4] Task tasks.my_task[04dcb40c-0a77-437b-a129-57eb52850a51] succeeded in 0.000763321000704309s: None
[2021-08-20 15:27:11,783: INFO/MainProcess] Task tasks.my_task[e8370a6b-085f-4bd5-b7ad-8f85f4b61908] received
[2021-08-20 15:27:11,786: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
[2021-08-20 15:27:11,786: WARNING/ForkPoolWorker-4] 

[2021-08-20 15:27:11,787: INFO/ForkPoolWorker-4] Task tasks.my_task[e8370a6b-085f-4bd5-b7ad-8f85f4b61908] succeeded in 0.0006725780003762338s: None
[2021-08-20 15:27:21,797: INFO/MainProcess] Task tasks.my_task[c14d875d-7f6c-45c2-a76b-4e9483273185] received
[2021-08-20 15:27:21,799: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
[2021-08-20 15:27:21,799: WARNING/ForkPoolWorker-4] 

[2021-08-20 15:27:21,800: INFO/ForkPoolWorker-4] Task tasks.my_task[c14d875d-7f6c-45c2-a76b-4e9483273185] succeeded in 0.0006371149993356084s: None
Rimose answered 20/8, 2021 at 7:32 Comment(2)
is it production-grade libConfigurationism
It has not been working, getting db error after commit: sqlalchemy.exc.ArgumentError: Column expression, FROM clause, or other columns clause element expected, got [<class 'celery_sqlalchemy_scheduler.models.PeriodicTaskChanged'>]. Did you mean to say select(<class 'celery_sqlalchemy_scheduler.models.PeriodicTaskChanged'>)?. Also, the author archived the repoDaria
C
4

You can check out this flask-djcelery which configures flask and djcelery and also provides browseable rest api

Corollaceous answered 16/11, 2016 at 20:41 Comment(0)
B
3

Some time ago I needed to dynamically update periodic tasks in Celery and Django, and I wrote an article about my approach (code for article).

I was using django-celery-beat package. It provides database models for PeriodicTask and IntervalSchedule. By manipulating PeriodicTask objects, you can add/remove/update/pause periodic tasks in Celery.

Create periodic task

from django_celery_beat.models import IntervalSchedule, PeriodicTask

schedule, created = IntervalSchedule.objects.get_or_create(
    every=instance.interval,
    period=IntervalSchedule.SECONDS,
)

task = PeriodicTask.objects.create(
    interval=schedule,
    name=f"Monitor: {instance.endpoint}",
    task="monitors.tasks.task_monitor",
    kwargs=json.dumps(
        {
            "monitor_id": instance.id,
        }
    ),
)

Remove periodic task

PeriodicTask.objects.get(pk=task_id).delete()

Change interval in a periodic task

task = PeriodicTask.objects.get(pk=your_id)
schedule, created = IntervalSchedule.objects.get_or_create(
    every=new_interval,
    period=IntervalSchedule.SECONDS,
)
task.interval = schedule
task.save()

Pause periodic task

task = PeriodicTask.objects.get(pk=your_id)
task.enabled = false
task.save()

Beat service

When using django-celery-beat you need to pass scheduler argument when starting beat service:

celery -A backend beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler --max-interval 10
Beabeach answered 18/10, 2022 at 9:2 Comment(0)
U
-1

Celery can realize the dynamic periodic task with databases and calling itself.

But APSchedule is better.

Because dynamic periodic task always means long countdown or eta. Too many of these periodic tasks can take up a lot of memory, making it time-consuming to restart and execute non-delayed tasks.

tasks.py

import sqlite3
from celery import Celery
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1',
    imports=['tasks'],
)

conn = sqlite3.connect('database.db', check_same_thread=False)
c = conn.cursor()
sql = '''
CREATE TABLE IF NOT EXISTS `tasks` 
(
   `id` INTEGER UNIQUE PRIMARY KEY AUTOINCREMENT,
   `name` TEXT,
   `countdown` INTEGER
);
'''
c.execute(sql)


def create(name='job', countdown=5):
    sql = 'INSERT INTO `tasks` (`name`, `countdown`) VALUES (?, ?)'
    c.execute(sql, (name, countdown))
    conn.commit()
    return c.lastrowid


def read(id=None, verbose=False):
    sql = 'SELECT * FROM `tasks` '
    if id:
        sql = 'SELECT * FROM `tasks` WHERE `id`={}'.format(id)
    all_rows = c.execute(sql).fetchall()
    if verbose:
        print(all_rows)
    return all_rows


def update(id, countdown):
    sql = 'UPDATE `tasks` SET `countdown`=? WHERE `id`=?'
    c.execute(sql, (countdown, id))
    conn.commit()


def delete(id, verbose=False):
    sql = 'DELETE FROM `tasks` WHERE `id`=?'
    affected_rows = c.execute(sql, (id,)).rowcount
    if verbose:
        print('deleted {} rows'.format(affected_rows))
    conn.commit()


@app.task
def job(id):
    id = read(id)
    if id:
        id, name, countdown = id[0]
    else:
        logger.info('stop')
        return

    logger.warning('id={}'.format(id))
    logger.warning('name={}'.format(name))
    logger.warning('countdown={}'.format(countdown))

    job.apply_async(args=(id,), countdown=countdown)

main.py

from tasks import *

id = create(name='job', countdown=5)
job(id)
# job.apply_async((id,), countdown=5)  # wait 5s

print(read())

input('enter to update')
update(id, countdown=1)

input('enter to delete')
delete(id, verbose=True)
Unravel answered 20/4, 2021 at 6:21 Comment(1)
APS scheduler does support distributed flask appsConfigurationism

© 2022 - 2024 — McMap. All rights reserved.