Apscheduler is executing job multiple times
Asked Answered
G

3

6

I have a django application running with uwsgi (with 10 workers) + ngnix. I am using apscheduler for scheduling purpose. Whenever i schedule a job it is being executed multiple times. From these answers ans1, ans2 i got to know this is because the scheduler is started in each worker of uwsgi. I did conditional initializing of the scheduler by binding it to a socket as suggested in this answer and also by keeping a status in the db, so that only one instance of scheduler will be started, but still the same problem exists and also sometimes when creating a job the scheduler is found not running and the job keeps pending and not executed.

I am initializing apscheduler in urls of the django application with following code. This will start the scheduler when application starts.

def job_listener(ev):
    print('event',ev)


job_defaults = {
    'coalesce': True,  
    'max_instances': 1
}

scheduler = BackgroundScheduler(job_defaults=job_defaults, timezone=TIME_ZONE, daemon=False)
scheduler.add_jobstore(MongoDBJobStore(client=client), 'default')
scheduler.add_executor(ThreadPoolExecutor(), 'default')
scheduler.add_executor(ProcessPoolExecutor(),'processpool')
scheduler.add_listener(job_listener)


def initialize_scheduler():
    try:
        if scheduler_db_conn.find_one():
            print('scheduler already running')
            return True
        scheduler.start()
        scheduler_db_conn.save({'status': True})
        print('---------------scheduler started --------------->')
        return True
    except:
        return False

I use following code to create the job.

from scheduler_conf import scheduler
def create_job(arg_list):
    try:
        print('scheduler status-->',scheduler.running)
        job = scheduler.add_job(**arg_list)
        return True
    except:
        print('error in creating Job')
        return False

I am not able to configure and run the scheduler properly. I have referred all the threads in apschedule but still hasn't got a solution.

  • If i don't limit from having multiple schedulers running in each worker the job is executed multiple times.
  • But if i limit to only one scheduler running inside a worker,some jobs keep pending and not execute.

Whats the solution for this?

Guppy answered 31/8, 2016 at 15:25 Comment(0)
F
7

Let's consider the following facts:

(1) UWSGI, by default, pre-loads your Django App into the UWSGI Master process' memory BEFORE forking its workers.

(2) UWSGI "forks" workers from the master, meaning they are essentially copied into the memory of each worker. Because of how fork() is implemented, a Child process (i.e. a worker) does not inherit the threads of a Parent.

(3) When you call BackgroundScheduler.start(), a thread is created which is responsible for executing jobs on whatever worker/master calls this function.

All you must do, is call BackgroundScheduler.start() on the Master process, before any workers are created. By doing so, when the workers are created, they WILL NOT INHERIT the BackgroundScheduler thread (#2 above), and thus will not execute any jobs (but they still can schedule/modify/delete jobs by communicating with the jobstore!).

To do this, just make sure you call BackgroundScheduler.start() in whatever function/module instantiates your app. For instance, in the following Django project structure, we'd (likely) want to execute this code in wsgi.py, which is the entry point for the UWSGI server.:

mysite/
manage.py
mysite/
    __init__.py
    settings.py
    urls.py
    wsgi.py

Pitfalls:

Don't "initializ[e] apscheduler in urls of the django application.... This will start the scheduler when application starts." These may be loaded by each worker, and thus start() is executed multiple times.

Don't start the UWSGI server in "lazy-app" mode, this will load the app in each of the workers, after they are created.

Don't run the BackgroundScheduler with the default (memory) jobstore. This will create split-brain syndrome between all workers. You want to enforce a single-point-of-truth, like you are with MongoDB, for all CRUD operations performed on jobs.

This post may give you more detail, only in a Gunicorn (WSGI server) environment.

Factory answered 21/10, 2016 at 17:35 Comment(0)
R
0

Let’s say you want to run BackgroundScheduler in your application with the default job store and the default executor:

from apscheduler.schedulers.background import BackgroundScheduler


scheduler = BackgroundScheduler()

This will get you a BackgroundScheduler with a MemoryJobStore named “default” and a ThreadPoolExecutor named “default” with a default maximum thread count of 10.

Now, suppose you want to reduce thread count and you also want to tweak the default values for new jobs and set a different timezone. The following example will get you:

from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler

job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(job_defaults=job_defaults, timezone=utc)
Rabbitry answered 24/3, 2021 at 19:58 Comment(0)
W
0

Good solution to the problem of BackgroundSchedule multiple execution.

The solution to the problem is below in the attached code. Here I will describe the essence of the problem.

As default, schedule.start() will throw an error when calling the start() method again. However, since Django runs its modules 1-2 times before the main launch, we may end up with different scheduler objects. In this case, calling start() again (on a different scheduler) will not cause an error and we can get several simultaneously running BackgroundScheduler. However, if we solve the problem of different scheduler objects, another one awaits us:

If this method is called again on the same scheduler object (the same object in RAM) but in a different thread, start() will also succeed and we will start a new BackgroundScheduler thread . Why if it's the same object in memory, isn't it strange? I assume that when calling the start() method, apscheduler checks whether there is a running BackgroundScheduler in the current thread. But it is not in the current thread. Therefore, on the same scheduler object, the start() method can work more than once if called from different threads. Then why not allow this method to be executed several times, but allow only one scheduler object in only one thread to be executor? This is the solution to the problem described below.

So let's move on to the solution. For example, project structure:

project/
  core/
    wsgi.py
    scheduler.py # add this file
    job_executor.py # add this file
    ...
  app/
    scheduler_jobs.py # add this file
    signals.py 
    ...

# scheduler.py

from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore


# Nessesary for multi-threading projects (if you use BackgroundScheduler).
# If this func is called in the thread in which the main scheduler was started, 
#   then it will not affect anything. If it is called in another thread, 
#   it will start the scheduler only as a registrator.
# Use this func in any place where you want to add job.
# In particular, it is only necessary in other theads.
def prepare_scheduler():
    if not scheduler.running:
        scheduler.start(paused=True)  # only registrator, not executor

# Interval for checking jobs registered in other threads (for their execution)
job_checking_interval = 10
execution_timeout = 10

# Allowed delay time for job execution
misfire_grace_time = job_checking_interval + execution_timeout


job_defaults={'misfire_grace_time': misfire_grace_time}      
scheduler = BackgroundScheduler(job_defaults=job_defaults)
scheduler.add_jobstore(DjangoJobStore(), 'default')
#job_executor.py

from threading import Thread
from time import sleep

from .scheduler import scheduler, job_checking_interval

# Will check the need to execute jobs, registered in other threads
def job_executor():
    while True:
        scheduler.wakeup()
        sleep(job_checking_interval)

def start_job_executor():    
    Thread(target=job_executor, daemon=True).start()
# wsgi.py  (or another entry-point file of your framework)

# ... your code

from .job_executor import scheduler, start_job_executor

scheduler.start()  # registrator & exeсutor
start_job_executor()
# scheduler_jobs.py

from core.scheduler import scheduler, prepare_scheduler

# Your any job
def scheduler_job():
    prepare_scheduler()  # Necessary for other thread job registration
    scheduler.add_job(your_func, trigger=any_trigger)
# signals.py

# I took this file as an example 
# because signals usually run in a different thread

from django.db.models.signals import any_signal
from django.dispatch import receiver

from .models import YourModel
from .scheduler_jobs import scheduler_job

     
@receiver(any_signal, sender=YourModel)
def add_job(**kwargs):
    scheduler_job()

Now scheduler starts only once as an executor (wsgi.py runs only once (because it is entry-point file), unlike other files) and works in parallel with the project.

At the same time, in other threads where our main scheduler was not started, new schedulers will be started, but only as job registrators.

If you have more than one workers (for example, you launch your project through Gunicorn), then in addition to the above, add the “--preload” flag to your application launch command.

gunicorn --bind 127.0.0.1:8000 core.wsgi --preload

This way wsgi.py will only be run once, which is what we need.

In my opinion the problem is solved.

Welcy answered 9/1, 2024 at 13:43 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.