Make sure only one worker launches the apscheduler event in a pyramid web app running multiple workers
Asked Answered
H

3

48

We have a web app made with pyramid and served through gunicorn+nginx. It works with 8 worker threads/processes

We needed to jobs, we have chosen apscheduler. here is how we launch it

from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from apscheduler.scheduler import Scheduler

rerun_monitor = Scheduler()
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

The issue is that all the worker processes of gunicorn pick the scheduler up. We tried implementing a file lock but it does not seem like a good enough solution. What would be the best way to make sure at any given time only one of the worker process picks the scheduled event up and no other thread picks it up till next JOB_INTERVAL?

The solution needs to work even with mod_wsgi in case we decide to switch to apache2+modwsgi later. It needs to work with single process development server which is waitress.

Update from the bounty sponsor

I'm facing the same issue described by the OP, just with a Django app. I'm mostly sure adding this detail won't change much if the original question. For this reason, and to gain a bit more of visibility, I also tagged this question with django.

Haygood answered 17/4, 2013 at 6:48 Comment(5)
Where do the jobs come from? Do web requests sometimes add new jobs?Reaper
No. It is just a job that monitors a resource and takes action based on the resource state. The resource state is modified by the request.Haygood
Moreover the scheduler job is added in the __init__.py of the applicationHaygood
Wow, I do exactly the same thing in my app except I hadn't foreseen this problem because I'm only in development with Waitress. Keeping my eye on this post!Scotsman
Look at my solution to this problem: https://mcmap.net/q/153249/-apscheduler-is-executing-job-multiple-timesCute
C
66

Because Gunicorn is starting with 8 workers (in your example), this forks the app 8 times into 8 processes. These 8 processes are forked from the Master process, which monitors each of their status & has the ability to add/remove workers.

Each process gets a copy of your APScheduler object, which initially is an exact copy of your Master processes' APScheduler. This results in each "nth" worker (process) executing each job a total of "n" times.

A hack around this is to run gunicorn with the following options:

env/bin/gunicorn module_containing_app:app -b 0.0.0.0:8080 --workers 3 --preload

The --preload flag tells Gunicorn to "load the app before forking the worker processes". By doing so, each worker is "given a copy of the app, already instantiated by the Master, rather than instantiating the app itself". This means the following code only executes once in the Master process:

rerun_monitor = Scheduler()
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

Additionally, we need to set the jobstore to be anything other than :memory:.This way, although each worker is its own independent process unable of communicating with the other 7, by using a local database (rather then memory) we guarantee a single-point-of-truth for CRUD operations on the jobstore.

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

rerun_monitor = Scheduler(
    jobstores={'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

Lastly, we want to use the BackgroundScheduler because of its implementation of start(). When we call start() in the BackgroundScheduler, a new thread is spun up in the background, which is responsible for scheduling/executing jobs. This is significant because remember in step (1), due to our --preload flag we only execute the start() function once, in the Master Gunicorn process. By definition, forked processes do not inherit the threads of their Parent, so each worker doesn't run the BackgroundScheduler thread.

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

rerun_monitor = BackgroundScheduler(
    jobstores={'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

As a result of all of this, every Gunicorn worker has an APScheduler that has been tricked into a "STARTED" state, but actually isn't running because it drops the threads of it's parent! Each instance is also capable of updating the jobstore database, just not executing any jobs!

Check out flask-APScheduler for a quick way to run APScheduler in a web-server (like Gunicorn), and enable CRUD operations for each job.

Ciri answered 20/10, 2016 at 18:47 Comment(5)
The answer does make sense, I can confirm that the .start() part of my code only gets called once when done with a --preload, but some of tasks does gets triggered twice (even thought my worker configuration is -w 4)Delve
If you have a gunicorn python configuration file, you can create a function called on_starting that will only be executed by the gunicorn master process and not by the worker processes. You could also create and start the scheduler there. Check out github.com/mlsecproject/gglsbl-rest/blob/master/config.py for an example.Windstorm
env/bin/gunicorn module_containing_app:app -b 0.0.0.0:8080 --workers 3 --preload If I have a domain name, should it be env/bin/gunicorn module_containing_app:app -b domain.example.com--workers 3 --preload ? Also, I don't have a folder env/bin/gunicornGreenstein
Running schedulers in parent gunicorn process is low key very dangerous idea. When gunicorn forks into workers, it reuses existing network sockets, notably it reuses DB connections. Which means that when a worker is forked after scheduler have initialized at least one connection in sqlalchemy pool, parent gunicorn process and the worker will talk to the DB via the same socket, which will systematically cause conflicts even under average load. Also note that gunicorn can fork workers later e.g. when workers start hanging. Don't do this. It cost me a lot of hours to track down some tricky issuesMccary
I am facing the exact same issue here. I have some "APScheduler" tasks (4) doing SSE-related tasks and publishing those to front-end clients. Is it best to just separate into 2 different apps? 1 that just handles the SSE/APScheduler stuff (using a single gunicorn worker thread), and a new separate app for the web dashboard/UI that will receive the SSE published messages?Convalesce
E
28

I found a workaround that worked with a Django project having a very similar issue. I simply bind a TCP socket the first time the scheduler starts and check against it subsequently. I think the following code can work for you as well with minor tweaks.

import sys, socket

try:
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("127.0.0.1", 47200))
except socket.error:
    print "!!!scheduler already started, DO NOTHING"
else:
    from apscheduler.schedulers.background import BackgroundScheduler
    scheduler = BackgroundScheduler()
    scheduler.start()
    print "scheduler started"
Essene answered 4/12, 2014 at 20:52 Comment(5)
I had no idea you updated the question. I could not solve the problem at all and I switched to celery. You might want to consider it because I feel celery gives you a lot more power once you have gotten your head around it. You have the option of running scheduled jobs with celery beat.Haygood
However I am accepting your answer because you seem to have found a solution and in future it might help peopleHaygood
@shortfellow I updated the question because after offering the bounty I realized that with a minor change would also describe my use case well without altering your question too much. If my answer didn't help just don't accept it. The celery suggestion worth a look, thanks.Essene
Seems to me the difference between Celery and APScheduler is that APScheduler gives you a lot more flexibility in adding and removing cron type jobs on the fly whereas to add and remove jobs from Celerybeat requires reconfiguration and restart: #10195475 versus pythonadventures.wordpress.com/2013/08/06/apscheduler-examples (see cron-style)Piave
@GeekSambhu no idea sorryEssene
A
5

Short answer: You can't do it properly without consequences.

I'm using Gunicorn as an example, but it is essentially the same for uWSGI. There are various hacks when running multiple processes, to name a few:

  1. use --preload option
  2. use on_starting hook to start the APScheduler background scheduler
  3. use when_ready hook to start the APScheduler background scheduler

They work to some extent but may get the following errors:

  1. worker timing out frequently
  2. scheduler hanging when there are no jobs https://github.com/agronholm/apscheduler/issues/305

APScheduler is designed to run in a single process where it has complete control over the process of adding jobs to job stores. It uses threading.Event's wait() and set() methods to coordinate. If they are run by different processes, the coordination wouldn't work.

It is possible to run it in Gunicorn in a single process.

  1. use only one worker process
  2. use the post_worker_init hook to start the scheduler, this will make sure the scheduler is run only in the worker process but not the master process

The author also pointed out sharing the job store amount multiple processes isn't possible. https://apscheduler.readthedocs.io/en/stable/faq.html#how-do-i-share-a-single-job-store-among-one-or-more-worker-processes He also provided a solution using RPyC.

While it's entirely doable to wrap APScheduler with a REST interface. You might want to consider serving it as a standalone app with one worker. In another word, if you have others endpoints, put them in another app where you can use multiple workers.

Aldus answered 8/9, 2020 at 15:59 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.