I'm working with a web application that allows users to create events (one-off or recurring) on a calendar, and shortly before an event starts the system will notify its participants. I'm having trouble with designing the flow for such notification, particularly for recurring events.
Things to consider:
- The architecture of the web application made it so that there are many databases of the same structure, each keeps its own set of users and events. Thus any query against one database needs to be made against several thousand others.
A recurring event may have excluded dates (similar to RRULE and EXDATE combination).
Users can update event's time / recurring rule.
The application is written in Python and already using Celery 3.1 with Redis broker. Solutions work with this setting would be nice, though anything will do. From what I have found, it is hard to add periodic task dynamically with Celery currently.
A solution I'm attempting:
A periodic task runs once a day, scanning every database and add tasks to do notification at appropriate time for each event that has a recurrence that day.
Each task generated as above has its id saved temporarily in Redis. In case users change event time for that day after its notification task is scheduled, the task will be revoked and replaced with new one.
Sample code for above solution:
In
tasks.py
, all the tasks to run:from celery.task import task as celery_task from celery.result import AsyncResult from datetime import datetime # ... @celery_task def create_notify_task(): for account in system.query(Account): db_session = account.get_session() # get sql alchemy session for event in db_session.query(Event): schedule_notify_event(account, partial_event) @celery_task(name='notify_event_users') def notify_event_users(account_id, event_id): # do notification for every event participant pass def schedule_notify_event(account, event): partial_event = event.get_partial_on(datetime.today()) if partial_event: result = notify_event_users.apply_async( args = (account.id, event.id), eta = partial_event.start) replace_task_id(account.id, event.id, result.id) else: replace_task_id(account.id, event.id, None) def replace_task_id(account_id, event_id, result_id): key = '{}:event'.format(account_id) client = redis.get_client() old_result_id = client.hget(key, event_id) if old_result_id: AsyncResult(old_result_id).revoke() client.hset(key, event_id, result_id)
In
event.py
:# when a user change event's time def update_event(event, data): # ... # update event # ... schedule_notify_event(account, event)
Celery setup file:
from celery.schedules import crontab CELERYBEAT_SCHEDULE = { 'create-notify-every-day': { 'task': 'tasks.create_notify_task', 'schedule': crontab(minute=0, hour=0), 'args': (,) }, }
Some downsides of the above are:
The daily task can take a long time to run. Events in databases processed last have to wait and might be missed. Scheduling that task earlier (e.g. 2 hours before next day) may alleviate this, however first run setup (or after a server restart) is a little awkward.
Care must be taken so that notify task doesn't get scheduled twice for the same event (e.g. because create_notify_task is run more than once a day...).
Is there a more sensible approach to this?
Related questions: