The answer from @asksol is what's needed if in a Django application.
For non-django applications, you can use celery-sqlalchemy-scheduler
which is modeled like django-celery-beat for Django since it also uses database instead of the file celerybeat-schedule
.
Here is an example with runtime addition of a new task.
tasks.py
from celery import Celery
celery = Celery('tasks')
beat_dburi = 'sqlite:///schedule.db'
celery.conf.update(
{'beat_dburi': beat_dburi}
)
@celery.task
def my_task(arg1, arg2, be_careful):
print(f"{arg1} {arg2} be_careful {be_careful}")
Logs (Producer)
$ celery --app=tasks beat --scheduler=celery_sqlalchemy_scheduler.schedulers:DatabaseScheduler --loglevel=INFO
celery beat v5.1.2 (sun-harmonics) is starting.
[2021-08-20 15:20:20,927: INFO/MainProcess] beat: Starting...
Logs (Consumer)
$ celery --app=tasks worker --queues=celery --loglevel=INFO
-------------- celery@ubuntu20 v5.1.2 (sun-harmonics)
[2021-08-20 15:20:02,287: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
Database schedules
$ sqlite3 schedule.db
sqlite> .databases
main: /home/nponcian/Documents/Program/1/db/schedule.db
sqlite> .tables
celery_crontab_schedule celery_periodic_task_changed
celery_interval_schedule celery_solar_schedule
celery_periodic_task
sqlite> select * from celery_periodic_task;
1|celery.backend_cleanup|celery.backend_cleanup||1||[]|{}|||||2021-08-20 19:20:20.955246|0||1||0|2021-08-20 07:20:20|
Now, while those workers are already running, let's update the schedules by adding a new scheduled task. Note that this is at runtime, without the need to restart the workers.
$ python3
>>> # Setup the session.
>>> from celery_sqlalchemy_scheduler.models import PeriodicTask, IntervalSchedule
>>> from celery_sqlalchemy_scheduler.session import SessionManager
>>> from tasks import beat_dburi
>>> session_manager = SessionManager()
>>> engine, Session = session_manager.create_session(beat_dburi)
>>> session = Session()
>>>
>>> # Setup the schedule (executes every 10 seconds).
>>> schedule = session.query(IntervalSchedule).filter_by(every=10, period=IntervalSchedule.SECONDS).first()
>>> if not schedule:
... schedule = IntervalSchedule(every=10, period=IntervalSchedule.SECONDS)
... session.add(schedule)
... session.commit()
...
>>>
>>> # Create the periodic task
>>> import json
>>> periodic_task = PeriodicTask(
... interval=schedule, # we created this above.
... name='My task', # simply describes this periodic task.
... task='tasks.my_task', # name of task.
... args=json.dumps(['arg1', 'arg2']),
... kwargs=json.dumps({
... 'be_careful': True,
... }),
... )
>>> session.add(periodic_task)
>>> session.commit()
Database schedules (updated)
- We can now see that the newly added schedule has reflected to the database which is continuously read by the celery beat scheduler. So should there be any updates with the values of the args or kwargs, we can easily perform SQL updates on the database and it should reflect in realtime with the running workers (without the need of restart).
sqlite> select * from celery_periodic_task;
1|celery.backend_cleanup|celery.backend_cleanup||1||[]|{}|||||2021-08-20 19:20:20.955246|0||1||0|2021-08-20 07:20:20|
2|My task|tasks.my_task|1|||["arg1", "arg2"]|{"be_careful": true}||||||0||1||0|2021-08-20 07:26:49|
Logs (Producer)
- Now, the new task is being enqueued every 10 seconds
[2021-08-20 15:26:51,768: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2021-08-20 15:26:51,768: INFO/MainProcess] Writing entries...
[2021-08-20 15:27:01,789: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)
[2021-08-20 15:27:11,776: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)
[2021-08-20 15:27:21,791: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)
Logs (Consumer)
- The newly added task is correctly executed on time every 10 seconds
[2021-08-20 15:27:01,797: INFO/MainProcess] Task tasks.my_task[04dcb40c-0a77-437b-a129-57eb52850a51] received
[2021-08-20 15:27:01,798: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
[2021-08-20 15:27:01,799: WARNING/ForkPoolWorker-4]
[2021-08-20 15:27:01,799: INFO/ForkPoolWorker-4] Task tasks.my_task[04dcb40c-0a77-437b-a129-57eb52850a51] succeeded in 0.000763321000704309s: None
[2021-08-20 15:27:11,783: INFO/MainProcess] Task tasks.my_task[e8370a6b-085f-4bd5-b7ad-8f85f4b61908] received
[2021-08-20 15:27:11,786: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
[2021-08-20 15:27:11,786: WARNING/ForkPoolWorker-4]
[2021-08-20 15:27:11,787: INFO/ForkPoolWorker-4] Task tasks.my_task[e8370a6b-085f-4bd5-b7ad-8f85f4b61908] succeeded in 0.0006725780003762338s: None
[2021-08-20 15:27:21,797: INFO/MainProcess] Task tasks.my_task[c14d875d-7f6c-45c2-a76b-4e9483273185] received
[2021-08-20 15:27:21,799: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
[2021-08-20 15:27:21,799: WARNING/ForkPoolWorker-4]
[2021-08-20 15:27:21,800: INFO/ForkPoolWorker-4] Task tasks.my_task[c14d875d-7f6c-45c2-a76b-4e9483273185] succeeded in 0.0006371149993356084s: None