I had a similar problem, where my scheduler process was a uWSGI MULE process and there was a separate app where I wanted to add new jobs.
Looking at the BaseScheduler's add_job()
function:
with self._jobstores_lock:
if not self.running:
self._pending_jobs.append((job, jobstore, replace_existing))
self._logger.info('Adding job tentatively -- it will be properly scheduled when the scheduler starts')
else:
self._real_add_job(job, jobstore, replace_existing, True)
you can see the problem: the scheduler adds jobs only when it is already started.
The solution is fortunately quite simple, we should define our own "add-job-only" Scheduler:
class JobAddScheduler(BlockingScheduler):
def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, misfire_grace_time=undefined,
coalesce=undefined, max_instances=undefined, next_run_time=undefined, jobstore='default',
executor='default', replace_existing=False, **trigger_args):
job_kwargs = {
'trigger': self._create_trigger(trigger, trigger_args),
'executor': executor,
'func': func,
'args': tuple(args) if args is not None else (),
'kwargs': dict(kwargs) if kwargs is not None else {},
'id': id,
'name': name,
'misfire_grace_time': misfire_grace_time,
'coalesce': coalesce,
'max_instances': max_instances,
'next_run_time': next_run_time
}
job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if value is not undefined)
job = Job(self, **job_kwargs)
# Add jobs to job store
with self._jobstores_lock:
self._real_add_job(job, jobstore, replace_existing, True)
return job
def start(self):
pass
def shutdown(self, wait=True):
pass
def _main_loop(self):
pass
def wakeup(self):
pass
Then we can add cron jobs instantaneously:
jobscheduler = JobAddScheduler()
jobscheduler.add_job(...)
Don't forget the configure the scheduler! In my case I used SQLAlchemy-MySQL backend for storing jobs:
jobstores=dict(default=SQLAlchemyJobStore(url='mysql+pymsql://USER:PASSWORD@SERVER/DATABASE'))
jobscheduler.configure(jobstores=jobstores)
I'm not sure about the other jobstores, but after I added a new job, I had to call the wakeup()
function of the separate scheduler process to "active" the job. I achieved this using uWSGI's signal system.
with self._jobstores_lock
doesn't actually do the right thing, given that it's in a separate process. – Nightwear