APScheduler how to add job outside the scheduler?
Asked Answered
K

1

7

I have a simple requirement. I am running apscheduler as a separate process. I have another jobproducer script from where I want to add a job to the scheduler and run it.

This is my scheduler code,

# appsched.py
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.start()

This is my job producer script,

# jobproducer.py
from appsched import scheduler

def say_hello_job():
    print "Hello"

scheduler.add_job(say_hello_job, 'interval', minutes=1)

Needless to say that this did not work. Is there a way to make this work by either using a jobstore maybe ? How to share a scheduler with multiple different job producers ?

Kidnap answered 3/9, 2015 at 10:58 Comment(0)
C
4

I had a similar problem, where my scheduler process was a uWSGI MULE process and there was a separate app where I wanted to add new jobs.

Looking at the BaseScheduler's add_job() function:

with self._jobstores_lock:
if not self.running:
    self._pending_jobs.append((job, jobstore, replace_existing))
    self._logger.info('Adding job tentatively -- it will be properly scheduled when the scheduler starts')
else:
    self._real_add_job(job, jobstore, replace_existing, True)

you can see the problem: the scheduler adds jobs only when it is already started.

The solution is fortunately quite simple, we should define our own "add-job-only" Scheduler:

class JobAddScheduler(BlockingScheduler):
  def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, misfire_grace_time=undefined,
              coalesce=undefined, max_instances=undefined, next_run_time=undefined, jobstore='default',
              executor='default', replace_existing=False, **trigger_args):

    job_kwargs = {
      'trigger': self._create_trigger(trigger, trigger_args),
      'executor': executor,
      'func': func,
      'args': tuple(args) if args is not None else (),
      'kwargs': dict(kwargs) if kwargs is not None else {},
      'id': id,
      'name': name,
      'misfire_grace_time': misfire_grace_time,
      'coalesce': coalesce,
      'max_instances': max_instances,
      'next_run_time': next_run_time
    }
    job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if value is not undefined)
    job = Job(self, **job_kwargs)

    # Add jobs to job store
    with self._jobstores_lock:
      self._real_add_job(job, jobstore, replace_existing, True)

    return job

  def start(self):
    pass

  def shutdown(self, wait=True):
    pass

  def _main_loop(self):
    pass

  def wakeup(self):
    pass

Then we can add cron jobs instantaneously:

jobscheduler = JobAddScheduler()
jobscheduler.add_job(...)

Don't forget the configure the scheduler! In my case I used SQLAlchemy-MySQL backend for storing jobs:

jobstores=dict(default=SQLAlchemyJobStore(url='mysql+pymsql://USER:PASSWORD@SERVER/DATABASE'))
jobscheduler.configure(jobstores=jobstores)

I'm not sure about the other jobstores, but after I added a new job, I had to call the wakeup() function of the separate scheduler process to "active" the job. I achieved this using uWSGI's signal system.

Colchicine answered 18/10, 2015 at 14:23 Comment(1)
I'm pretty sure with self._jobstores_lock doesn't actually do the right thing, given that it's in a separate process.Nightwear

© 2022 - 2025 — McMap. All rights reserved.