I set the scheduler with the "max_instances=10".There can be 10 jobs to run concurrently.Sometimes some jobs blocked, it wsa hanging there.When more than 10 jobs werr blocking there, the exception of "skipped: maximum number of running instances reached(10)". Does APScheduler have a way to set the max time of a job's duration.If the job runs beyond the max time, it will be terminated. If it doesn't have the way, what should I do?
APScheduler does not have a way to set the maximum run time of a job. This is mostly due to the fact that the underlying concurrent.futures package that is used for the PoolExecutors do not support such a feature. A subprocess could be killed but lacking the proper API, APScheduler would have to get a specialized executor to support this, not to mention an addition to the job API that allowed for timeouts. This is something to be considered for the next major version.
The question is, what do you want to do with the thread that is still running the job? Since threads cannot be forcibly terminated, the only option would be to let it run its course, but then it will still keep the thread busy.
I did a poor man's implementation for ThreadPoolExecutor
below.
Consider this as non-authoritative example for your own studies
Decorate jobs with a controller function that wraps the actual job function
Have a way to call a check function (from another thread) to see if any of the tasks has exceeded its time
Go down in flames instead of trying to recover from the situation
Because we want to have hard crashes (any situation is unrecoverable), we also unwind the scheduler in the case any of the jobs raises an exception
This code contains some internal context I was too lazy to clean way.
Scheduler wrapper class:
"""Chain specific tasks scheduler."""
import logging
import datetime
import threading
from dataclasses import dataclass
from typing import Callable, Dict, List
from apscheduler.events import EVENT_JOB_ERROR
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
from dex_ohlcv.processcontext import ProcessContext
from dex_ohlcv.timer import timed_task
logger = logging.getLogger(__name__)
class TaskTimeLimitExceeded(Exception):
"""One of background tasks have been busy for too long."""
@dataclass
class TaskControl:
"""Metadata about running backround tasks.
Mostly used to diagnose/kill backgrond scheduler.
"""
#: Task name
name: str
#: When started (UTC)
started_at: datetime.datetime
#: How long this task should run (UTC)
max_duration: datetime.timedelta
class OracleScheduler:
"""Thread pool scheduler ask runner using APSSCheuler.
One oracle can have two different schedulers
"""
def __init__(self, context: ProcessContext):
self.context = context
self.died = False
self.thread_count = 10
self.schedular = self._create_scheduler()
# Keep the list of active tasks in the memory so we can
# die if any of tasks exceeds it maximum duration.
# Tasks are keyed by the thread id of its runner.
self.active_tasks: Dict[int, TaskControl] = {}
assert context.chain_id
def is_dead(self) -> bool:
"""Check if the scheduler has died because of an exception in tasks"""
return self.died
def _create_scheduler(self):
"""Build APSScheduler instance"""
executors = {
'default': ThreadPoolExecutor(self.thread_count),
}
def listen_error(event):
if event.exception:
logger.info("Scheduled task received exception. event: %s, execption: %s", event, event.exception)
else:
logger.error("Should not happen")
scheduler = BackgroundScheduler(executors=executors, timezone=datetime.timezone.utc)
scheduler.add_listener(listen_error, EVENT_JOB_ERROR)
self.scheduler = scheduler
return scheduler
def die(self, exc: Exception):
""""Shutdown the scheduler and mark an clean exit.
Used by task control to abort in the case any tasks raise an exception.
"""
logger.error("The background scheduler is dying because one of the tasks received exception: %s", exc)
logger.exception(exc)
self.scheduler.shutdown(wait=False)
self.died = True
def job(self,
interval: datetime.timedelta,
start_time=datetime.datetime(1970, 1, 1),
max_instances=1,
max_duration=datetime.timedelta(hours=2),
) -> Callable:
"""Decorator to create managed background jobs.
- Decorated function takes one argument :py:class:`ProcessContext`:
`task(context: ProcessContext)`.
- All tasks are automatically tracked with :py:func:`timed_task`.
- If any of the jobs fail with an excetion, shutdown the whole oracle by
checking :py:meth:`is_dead` in the duty cycle loop.
:param interval:
How often the task should fire.
:param start_time:
Sets the start of the interval cycles. Passed to APScheduler add_job.
:param max_instance:
Limit the number of concurrent tasks. Passed to APScheduler add_job
:parma max_duration:
Max duration for this task.
If exceeded, the background scheduler can unwind
in :py:meth:`check_control_and_die`.
"""
assert isinstance(interval, datetime.timedelta)
context = self.context
# https://mcmap.net/q/56603/-decorators-with-parameters
def _outer(func):
def _inner():
task_id = threading.get_native_id()
name = func.__name__
task_control = TaskControl(
name,
datetime.datetime.utcnow(),
max_duration,
)
self.active_tasks[task_id] = task_control
try:
started = datetime.datetime.utcnow()
with timed_task(f"oracle.scheduled_task.{name}",
chain_id=context.chain_id.name,
task_type="oracle_scheduled_task"):
res = func(context)
# Warn if tasks start to take too long
duration = datetime.datetime.utcnow() - started
if duration > interval:
logger.warning("Task %s took %s, more than interval %s", name, duration, interval)
return res
except Exception as e:
self.die(e)
finally:
del self.active_tasks[task_id]
self.scheduler.add_job(
_inner,
'interval',
seconds=interval.total_seconds(),
start_date=start_time,
max_instances=max_instances,
)
return _outer
def get_exceeded_tasks(self) -> List[TaskControl]:
"""Check if any of the jobs have exceeded their time limit."""
now_ = datetime.datetime.utcnow()
return [tc for tc in self.active_tasks.values() if now_ > tc.started_at + tc.max_duration]
def check_control_and_die(self):
"""Check if any of the tasks have exceeded their time limit and die
Because we cannot abort thread code in CPython, just kill all other tasks
and hope the stuck ask dies as well
:raise TaskTimeLimitExceeded:
If any of background tasks have exceeded
their allocated life time.
"""
stuck = self.get_exceeded_tasks()
if not stuck:
return
logger.critical("Background tasks exceeded their time limits. Likely hung?")
self.stop()
# If we have multiple stuck tasks just raise for the first one
error_msg = None
for tc in stuck:
exceeded = (datetime.datetime.utcnow() - tc.started_at) - tc.max_duration
error_msg = f"Task {tc.name} is stuck. Started {tc.started_at}, limit {tc.max_duration}, exceeded time limit with {exceeded}"
logger.error(error_msg)
raise TaskTimeLimitExceeded(error_msg)
def start(self):
"""Start the scheduler.
Runs on a background thread and returns instantly.
"""
jobs = self.scheduler.get_jobs()
logger.info("Background scheduler starting. We have %d jobs", len(jobs))
self.scheduler.start()
def stop(self, wait=False):
"""Stop the scheduler."""
self.scheduler.shutdown(wait=wait)
Example tests:
"""Oracle background scheduled tasks tests."""
import datetime
import time
import pytest
from dex_ohlcv.scheduler.scheduler import OracleScheduler, TaskTimeLimitExceeded
def test_job_success(test_context):
"""Background jobs success."""
succeed = False
scheduler = OracleScheduler(test_context)
@scheduler.job(interval=datetime.timedelta(seconds=1))
def test_job(context: ProcessContext):
nonlocal succeed
succeed = True
scheduler.start()
time.sleep(3)
scheduler.stop()
assert not scheduler.is_dead()
assert succeed is True
def test_job_crashed(test_context):
"""Background jobs crashed."""
scheduler = OracleScheduler(test_context)
@scheduler.job(interval=datetime.timedelta(seconds=1))
def test_job(context: ProcessContext):
raise RuntimeError("Oh no")
scheduler.start()
time.sleep(3)
assert scheduler.is_dead()
def test_job_not_exceeded_time_limit(test_context):
"""Background jobs stay witin time limits.."""
scheduler = OracleScheduler(test_context)
@scheduler.job(interval=datetime.timedelta(seconds=1), max_duration=datetime.timedelta(seconds=30))
def test_job(context: ProcessContext):
time.sleep(4)
scheduler.start()
time.sleep(1)
assert not scheduler.get_exceeded_tasks()
def test_job_time_limit_exceeded(test_context):
"""Background jobs exceeds its time limit.."""
scheduler = OracleScheduler(test_context)
@scheduler.job(interval=datetime.timedelta(seconds=1), max_duration=datetime.timedelta(seconds=0))
def test_job(context: ProcessContext):
time.sleep(4)
scheduler.start()
time.sleep(1)
with pytest.raises(TaskTimeLimitExceeded):
scheduler.check_control_and_die()
© 2022 - 2025 — McMap. All rights reserved.