How can I set limit to the duration of a job with the APScheduler?
Asked Answered
B

2

8

I set the scheduler with the "max_instances=10".There can be 10 jobs to run concurrently.Sometimes some jobs blocked, it wsa hanging there.When more than 10 jobs werr blocking there, the exception of "skipped: maximum number of running instances reached(10)". Does APScheduler have a way to set the max time of a job's duration.If the job runs beyond the max time, it will be terminated. If it doesn't have the way, what should I do?

Barret answered 17/11, 2015 at 8:40 Comment(0)
F
8

APScheduler does not have a way to set the maximum run time of a job. This is mostly due to the fact that the underlying concurrent.futures package that is used for the PoolExecutors do not support such a feature. A subprocess could be killed but lacking the proper API, APScheduler would have to get a specialized executor to support this, not to mention an addition to the job API that allowed for timeouts. This is something to be considered for the next major version.

The question is, what do you want to do with the thread that is still running the job? Since threads cannot be forcibly terminated, the only option would be to let it run its course, but then it will still keep the thread busy.

Fireman answered 18/11, 2015 at 1:6 Comment(1)
any chance it made it to the next major version?Awful
B
1

I did a poor man's implementation for ThreadPoolExecutor below.

  • Consider this as non-authoritative example for your own studies

  • Decorate jobs with a controller function that wraps the actual job function

  • Have a way to call a check function (from another thread) to see if any of the tasks has exceeded its time

  • Go down in flames instead of trying to recover from the situation

  • Because we want to have hard crashes (any situation is unrecoverable), we also unwind the scheduler in the case any of the jobs raises an exception

This code contains some internal context I was too lazy to clean way.

Scheduler wrapper class:

"""Chain specific tasks scheduler."""
import logging
import datetime
import threading
from dataclasses import dataclass
from typing import Callable, Dict, List

from apscheduler.events import EVENT_JOB_ERROR
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler

from dex_ohlcv.processcontext import ProcessContext
from dex_ohlcv.timer import timed_task

logger = logging.getLogger(__name__)


class TaskTimeLimitExceeded(Exception):
    """One of background tasks have been busy for too long."""


@dataclass
class TaskControl:
    """Metadata about running backround tasks.

    Mostly used to diagnose/kill backgrond scheduler.
    """

    #: Task name
    name: str

    #: When started (UTC)
    started_at: datetime.datetime

    #: How long this task should run (UTC)
    max_duration: datetime.timedelta


class OracleScheduler:
    """Thread pool scheduler ask runner using APSSCheuler.

    One oracle can have two different schedulers
    """

    def __init__(self, context: ProcessContext):
        self.context = context
        self.died = False
        self.thread_count = 10
        self.schedular = self._create_scheduler()

        # Keep the list of active tasks in the memory so we can
        # die if any of tasks exceeds it maximum duration.
        # Tasks are keyed by the thread id of its runner.
        self.active_tasks: Dict[int, TaskControl] = {}

        assert context.chain_id

    def is_dead(self) -> bool:
        """Check if the scheduler has died because of an exception in tasks"""
        return self.died

    def _create_scheduler(self):
        """Build APSScheduler instance"""

        executors = {
            'default': ThreadPoolExecutor(self.thread_count),
        }

        def listen_error(event):
            if event.exception:
                logger.info("Scheduled task received exception. event: %s, execption: %s", event, event.exception)
            else:
                logger.error("Should not happen")

        scheduler = BackgroundScheduler(executors=executors, timezone=datetime.timezone.utc)
        scheduler.add_listener(listen_error, EVENT_JOB_ERROR)
        self.scheduler = scheduler
        return scheduler

    def die(self, exc: Exception):
        """"Shutdown the scheduler and mark an clean exit.

        Used by task control to abort in the case any tasks raise an exception.
        """
        logger.error("The background scheduler is dying because one of the tasks received exception: %s", exc)
        logger.exception(exc)
        self.scheduler.shutdown(wait=False)
        self.died = True

    def job(self,
            interval: datetime.timedelta,
            start_time=datetime.datetime(1970, 1, 1),
            max_instances=1,
            max_duration=datetime.timedelta(hours=2),
            ) -> Callable:
        """Decorator to create managed background jobs.

        - Decorated function takes one argument :py:class:`ProcessContext`:
          `task(context: ProcessContext)`.

        - All tasks are automatically tracked with :py:func:`timed_task`.

        - If any of the jobs fail with an excetion, shutdown the whole oracle by
          checking :py:meth:`is_dead` in the duty cycle loop.

        :param interval:
            How often the task should fire.

        :param start_time:
            Sets the start of the interval cycles. Passed to APScheduler add_job.

        :param max_instance:
            Limit the number of concurrent tasks. Passed to APScheduler add_job

        :parma max_duration:
            Max duration for this task.

            If exceeded, the background scheduler can unwind
            in :py:meth:`check_control_and_die`.
        """

        assert isinstance(interval, datetime.timedelta)

        context = self.context

        # https://mcmap.net/q/56603/-decorators-with-parameters
        def _outer(func):
            def _inner():

                task_id = threading.get_native_id()
                name = func.__name__

                task_control = TaskControl(
                    name,
                    datetime.datetime.utcnow(),
                    max_duration,
                )

                self.active_tasks[task_id] = task_control
                try:

                    started = datetime.datetime.utcnow()
                    with timed_task(f"oracle.scheduled_task.{name}",
                                    chain_id=context.chain_id.name,
                                    task_type="oracle_scheduled_task"):
                        res = func(context)

                    # Warn if tasks start to take too long
                    duration = datetime.datetime.utcnow() - started
                    if duration > interval:
                        logger.warning("Task %s took %s, more than interval %s", name, duration, interval)

                    return res
                except Exception as e:
                    self.die(e)
                finally:
                    del self.active_tasks[task_id]

            self.scheduler.add_job(
                _inner,
                'interval',
                seconds=interval.total_seconds(),
                start_date=start_time,
                max_instances=max_instances,
            )

        return _outer

    def get_exceeded_tasks(self) -> List[TaskControl]:
        """Check if any of the jobs have exceeded their time limit."""
        now_ = datetime.datetime.utcnow()
        return [tc for tc in self.active_tasks.values() if now_ > tc.started_at + tc.max_duration]

    def check_control_and_die(self):
        """Check if any of the tasks have exceeded their time limit and die

        Because we cannot abort thread code in CPython, just kill all other tasks
        and hope the stuck ask dies as well

        :raise TaskTimeLimitExceeded:
            If any of background tasks have exceeded
            their allocated life time.
        """
        stuck = self.get_exceeded_tasks()

        if not stuck:
            return

        logger.critical("Background tasks exceeded their time limits. Likely hung?")

        self.stop()

        # If we have multiple stuck tasks just raise for the first one
        error_msg = None
        for tc in stuck:
            exceeded = (datetime.datetime.utcnow() - tc.started_at) - tc.max_duration
            error_msg = f"Task {tc.name} is stuck. Started {tc.started_at}, limit {tc.max_duration}, exceeded time limit with {exceeded}"
            logger.error(error_msg)

        raise TaskTimeLimitExceeded(error_msg)

    def start(self):
        """Start the scheduler.

        Runs on a background thread and returns instantly.
        """
        jobs = self.scheduler.get_jobs()
        logger.info("Background scheduler starting. We have %d jobs", len(jobs))
        self.scheduler.start()

    def stop(self, wait=False):
        """Stop the scheduler."""
        self.scheduler.shutdown(wait=wait)

Example tests:

"""Oracle background scheduled tasks tests."""

import datetime
import time

import pytest

from dex_ohlcv.scheduler.scheduler import OracleScheduler, TaskTimeLimitExceeded


def test_job_success(test_context):
    """Background jobs success."""

    succeed = False

    scheduler = OracleScheduler(test_context)

    @scheduler.job(interval=datetime.timedelta(seconds=1))
    def test_job(context: ProcessContext):
        nonlocal succeed
        succeed = True

    scheduler.start()
    time.sleep(3)
    scheduler.stop()

    assert not scheduler.is_dead()
    assert succeed is True


def test_job_crashed(test_context):
    """Background jobs crashed."""

    scheduler = OracleScheduler(test_context)

    @scheduler.job(interval=datetime.timedelta(seconds=1))
    def test_job(context: ProcessContext):
        raise RuntimeError("Oh no")

    scheduler.start()
    time.sleep(3)
    assert scheduler.is_dead()


def test_job_not_exceeded_time_limit(test_context):
    """Background jobs stay witin time limits.."""

    scheduler = OracleScheduler(test_context)

    @scheduler.job(interval=datetime.timedelta(seconds=1), max_duration=datetime.timedelta(seconds=30))
    def test_job(context: ProcessContext):
        time.sleep(4)

    scheduler.start()
    time.sleep(1)
    assert not scheduler.get_exceeded_tasks()


def test_job_time_limit_exceeded(test_context):
    """Background jobs exceeds its time limit.."""

    scheduler = OracleScheduler(test_context)

    @scheduler.job(interval=datetime.timedelta(seconds=1), max_duration=datetime.timedelta(seconds=0))
    def test_job(context: ProcessContext):
        time.sleep(4)

    scheduler.start()
    time.sleep(1)

    with pytest.raises(TaskTimeLimitExceeded):
        scheduler.check_control_and_die()

Broeker answered 9/12, 2022 at 21:0 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.