Python APScheduler - How does AsyncIOScheduler work?
Asked Answered
L

3

13

I'm having a hard time understanding how the AsyncIOScheduler works, and how is it non blocking?

If my job is executing a blocking function, will the AsyncIOScheduler be blocking?

And what if I use AsyncIOScheduler with ThreadPoolExecutor? How does that work? Can I await the job execution?

Lastly answered 20/7, 2020 at 18:47 Comment(1)
The question should be updated to include desired behavior, a specific problem or error, and the shortest code necessary to reproduce the problem.Comte
L
10

So, in APScheduler there are 3 important components:

  1. The Scheduler
  2. The Executor(s)
  3. The Datastore(s)

For this question, only 1 and 2 are relevant.

The Scheduler is simply who decides when to call the jobs based on their interval settings, in the case of AsyncIOScheduler it uses asyncio to make the waiting period non blocking. It runs in the same process and thread as the main. It is very useful if your application is already running on an asyncio loop since it saves the overhead of running a new proccess/thread.

Now, when a job needs to be executed, is the Executor who is called, in the case of AsyncIOScheduler, by default is uses AsyncIOExecutor, which runs in the same thread and process as the Scheduler IF the job function is signed as async, else it uses asyncio's run_in_executor which runs it in a thread pool.

Which brings us to the last question, what happens if we use AsyncIOScheduler with ThreadPoolExecutor? Well, technically is the same as using the default Executor with a non async function, it will run it in a thread pool, but the scheduler will remain in the main thread.

Lastly answered 29/7, 2020 at 16:3 Comment(1)
what if we want AsyncIOScheduler with a datstore, any examplesHammerhead
C
19

Using some Internet resources I found some useful facts. Hope it will help you.

A typical APScheduler instance houses tens of jobs, which execute regular Python functions. There is no limit on the number of jobs an APScheduler instance can schedule; it only depends on the actual load of the machine. By default, APScheduler stores all jobs in-memory. If you want your jobs to survive from process restarts and keep triggering from the last time there were triggered, you can store these jobs in a database, such as any RDBMS, Redis, MongoDB, etc.

Depending on how your applications run, it can run as a thread, or an asyncio task, or else. When initialized, APScheduler doesn't do anything unless you add the Python functions as jobs. Once all the jobs are added, you need to "start" the scheduler. For a simple example of how to use APScheduler, here is a snippet of code that just works.

from urllib.request import urlopen
from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

@scheduler.scheduled_job("interval", seconds=10)
def keep_warm():
    urlopen("https://enqueuezero.com", timeout=10)
    
scheduler.start()

This makes sure a URL is requested every 10 seconds. The program runs as a blocking process. If you want to co-exist them with your application, you can consider using BackgroundScheduler, AsyncIOScheduler, etc.

Here some code snippets for BackgroundScheduler.

from datetime import datetime
import time
import os

from apscheduler.schedulers.background import BackgroundScheduler


def tick():
    print('Tick! The time is: %s' % datetime.now())


if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    scheduler.add_job(tick, 'interval', seconds=3)
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        # This is here to simulate application activity (which keeps the main thread alive).
        while True:
            time.sleep(2)
    except (KeyboardInterrupt, SystemExit):
        # Not strictly necessary if daemonic mode is enabled but should be done if possible
        scheduler.shutdown()

The below code will demonstrate how to use the asyncio compatible scheduler to schedule a job that executes on 3-second intervals.

import asyncio
import os
from datetime import datetime

from apscheduler.schedulers.asyncio import AsyncIOScheduler


def tick():
    print('Tick! The time is: %s' % datetime.now())


if __name__ == '__main__':
    scheduler = AsyncIOScheduler()
    scheduler.add_job(tick, 'interval', seconds=3)
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    # Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
    try:
        asyncio.get_event_loop().run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass

You can get more examples about apscheduler using this link!

Checkrein answered 29/7, 2020 at 15:24 Comment(3)
How can interval start exact 00'th second?Wabble
Is it OK to instantiate an AsyncIOScheduler in multiple classes, all running under a single event loop? I am trying to take into account both reliability and best practice. This is not for production code, just a side project, but I am fairly new to async in Python.Paneling
@alper, you can set next_run_time parameter of scheduler.scheduled_job(...) (or scheduler.add_job(...)) to (datetime.now() + timedelta(minutes=1)).replace(second=0, microsecond=0).Turbot
L
10

So, in APScheduler there are 3 important components:

  1. The Scheduler
  2. The Executor(s)
  3. The Datastore(s)

For this question, only 1 and 2 are relevant.

The Scheduler is simply who decides when to call the jobs based on their interval settings, in the case of AsyncIOScheduler it uses asyncio to make the waiting period non blocking. It runs in the same process and thread as the main. It is very useful if your application is already running on an asyncio loop since it saves the overhead of running a new proccess/thread.

Now, when a job needs to be executed, is the Executor who is called, in the case of AsyncIOScheduler, by default is uses AsyncIOExecutor, which runs in the same thread and process as the Scheduler IF the job function is signed as async, else it uses asyncio's run_in_executor which runs it in a thread pool.

Which brings us to the last question, what happens if we use AsyncIOScheduler with ThreadPoolExecutor? Well, technically is the same as using the default Executor with a non async function, it will run it in a thread pool, but the scheduler will remain in the main thread.

Lastly answered 29/7, 2020 at 16:3 Comment(1)
what if we want AsyncIOScheduler with a datstore, any examplesHammerhead
S
9

Based on the documentation an AsyncIOScheduler is executed in an event loop.

It is non-blocking because it will simply add itself to the event loop and wait until you start it.

Once the event loop is started it will run asynchronously.

from apscheduler.schedulers.asyncio import AsyncIOScheduler
import asyncio

async def job():
    print('hi')

scheduler = AsyncIOScheduler()
scheduler.add_job(job, "interval", seconds=3)

scheduler.start()

asyncio.get_event_loop().run_forever()

Output

Run time of job "job (trigger: interval[0:00:03], next run at: 2020-07-27 14:06:39 -03)" was missed by 0:00:02.542515
hi
hi
Szabadka answered 27/7, 2020 at 17:12 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.