When using asyncio, how do you allow all running tasks to finish before shutting down the event loop
Asked Answered
S

10

81

I have the following code:

@asyncio.coroutine
def do_something_periodically():
    while True:
        asyncio.async(my_expensive_operation())
        yield from asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

I run this function until complete. The problem occurs when shutdown is set - the function completes and any pending tasks are never run.

This is the error:

task: <Task pending coro=<report() running at script.py:33> wait_for=<Future pending cb=[Task._wakeup()]>>

How do I schedule a shutdown correctly?

To give some context, I'm writing a system monitor which reads from /proc/stat every 5 seconds, computes the cpu usage in that period, and then sends the result to a server. I want to keep scheduling these monitoring jobs until I receive sigterm, when I stop scheduling, wait for all current jobs to finish, and exit gracefully.

Sorghum answered 6/1, 2015 at 10:6 Comment(5)
To give some context, I'm writing a system monitor which reads from /proc/stat every 5 seconds, computes the cpu usage in that period, and then sends the result to a server. I want to keep scheduling these monitoring jobs until I receive sigterm, when I stop scheduling, wait for all current jobs to finish, and exit gracefully.Sorghum
have you tried yield from my_expensive_operation() \n yield from asyncio.sleep(my_interval - timer() % my_interval) instead?Mientao
I could just sleep for long enough that I know everything has finished, but this doesn't seem very clean. I was wondering if there was a way to schedule tasks and then run the loop until all scheduled tasks are complete. In javascript (node.js), if the main program reaches the end but there are callbacks set, then the process runs until all callbacks are removed.Sorghum
Oh sorry I see what you mean - you mean to not schedule with async, rather make the current process wait until the previous one is finished. It just feels like you should be able to do what I want to do (schedule tasks) and then wait till they are all finished.Sorghum
Keep the futures returned by async() (remove finished jobs). In principle, you could get all current Task instance (there might be a class attribute).Mientao
Y
74

You can retrieve unfinished tasks and run the loop again until they finished, then close the loop or exit your program.

pending = asyncio.all_tasks()
loop.run_until_complete(asyncio.gather(*pending))
  • pending is a list of pending tasks.
  • asyncio.gather() allows to wait on several tasks at once.

If you want to ensure all the tasks are completed inside a coroutine (maybe you have a "main" coroutine), you can do it this way, for instance:

async def do_something_periodically():
    while True:
        asyncio.create_task(my_expensive_operation())
        await asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

    await asyncio.gather(*asyncio.all_tasks())

Also, in this case, since all the tasks are created in the same coroutine, you already have access to the tasks:

async def do_something_periodically():
    tasks = []
    while True:
        tasks.append(asyncio.create_task(my_expensive_operation()))
        await asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

    await asyncio.gather(*tasks)
Yellowwood answered 12/1, 2015 at 21:12 Comment(9)
Very helpful! Just a note about the second method: I think that each task you append to the list represents an open file descriptor — this means that on (say) Linux, you could hit your open file limit (ulimit -n) before the coroutine is finished.Hedgehog
Hi, What do you mean by "represents"? AFAIK, tasks don't open file objects.Yellowwood
I have found, using the second method, that I get error messages about having too many open file descriptors. I think that each task requires a file descriptor to work. Note that a "file descriptor" is not the same thing as an open file, they might be also be those used by the select() call (which I believe the asyncio library uses). So if you have a user limit of a few thousand open file descriptors, and many more tasks than that, you may encounter problems.Hedgehog
Having said that, I haven't actually confirmed that this is the problem, because I found other ways to solve the problem. The "too many file descriptors" error could have been related to some other mistake I made. So I could well be wrong about this.Hedgehog
I can confirm that the only file descriptors opened by asyncio for its own use are the selector and a self-pipe, so 3 file descriptors. A Task object don't hold any resource object by itself so it must be an unrelated bug.Yellowwood
Note that asyncio.gather() will not cancel remaining tasks if one of them fails because of an exception. They remaining tasks will be left in the incomplete state and will not run exception handlers to clean up. Tasks that you pass to gather() may also create new tasks that will not be waited for. You would need to either manually track all started tasks and cancel them in exception handlers, or repeatedly call gather until there are none left (or cancel newly started ones).Brave
Doesn't your second example create a deadlock? The main task awaits on all other tasks to finish, but it is itself one of those tasks, so that can never happen. Right?Sarazen
Also, it's probably a good idea to pass return_exceptions=True to asyncio.gather(). The default behavior, if it's omitted, is bad: it will propagate the first exception that's raised and allow the tasks to keep running.Sarazen
I hav a question, If I have 1 billion tasks to run, and I set Semaphore to 100. When I use asyncio.all_tasks(), I will get 1 billion tasks or 100 tasks?Signesignet
R
35

I noticed some answers suggested using asyncio.gather(*asyncio.all_tasks()), but the issue with that can sometimes be an infinite loop where it waits for the asyncio.current_task() to complete, which is itself. Some answers suggested some complicated workarounds involving checking coro names or len(asyncio.all_tasks()), but it turns out it's very simple to do by taking advantage of set operations:

async def main():
    # Create some tasks.
    for _ in range(10):
        asyncio.create_task(asyncio.sleep(10))
    # Wait for all other tasks to finish other than the current task i.e. main().
    await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()})
Ria answered 3/8, 2021 at 3:1 Comment(1)
This works great! Expanded example: ``` async def main(): await longrunning_task() while True: all_tasks = asyncio.all_tasks() await asyncio.sleep(1) cur_task = asyncio.current_task() excl_cur_task = (all_tasks - {cur_task}) if len(excl_cur_task) < 1: return res = await asyncio.gather(*excl_cur_task) if __name__ == "__main__": asyncio.run(main()) ```Chipmunk
X
18

As of Python 3.7 the above answer uses multiple deprecated APIs (asyncio.async and Task.all_tasks,@asyncio.coroutine, yield from, etc.) and you should rather use this:

import asyncio


async def my_expensive_operation(expense):
    print(await asyncio.sleep(expense, result="Expensive operation finished."))


async def do_something_periodically(expense, interval):
    while True:
        asyncio.create_task(my_expensive_operation(expense))
        await asyncio.sleep(interval)


loop = asyncio.get_event_loop()
coro = do_something_periodically(1, 1)

try:
    loop.run_until_complete(coro)
except KeyboardInterrupt:
    coro.close()
    tasks = asyncio.all_tasks(loop)
    expensive_tasks = {task for task in tasks if task._coro.__name__ != coro.__name__}
    loop.run_until_complete(asyncio.gather(*expensive_tasks))
Xylotomous answered 26/8, 2018 at 10:2 Comment(0)
O
4

Use a wrapper coroutine that waits until the pending task count is 1 before returning.

async def loop_job():
    asyncio.create_task(do_something_periodically())
    while len(asyncio.Task.all_tasks()) > 1:  # Any task besides loop_job() itself?
        await asyncio.sleep(0.2)

asyncio.run(loop_job())
Oriflamme answered 9/7, 2019 at 3:0 Comment(0)
K
3

I'm not sure if this is what you've asked for but I had a similar problem and here is the ultimate solution that I came up with.

The code is python 3 compatible and uses only public asyncio APIs (meaning no hacky _coro and no deprecated APIs).

import asyncio

async def fn():
  await asyncio.sleep(1.5)
  print('fn')

async def main():
    print('main start')
    asyncio.create_task(fn()) # run in parallel
    await asyncio.sleep(0.2)
    print('main end')


def async_run_and_await_all_tasks(main):
  def get_pending_tasks():
      tasks = asyncio.Task.all_tasks()
      pending = [task for task in tasks if task != run_main_task and not task.done()]
      return pending

  async def run_main():
      await main()

      while True:
          pending_tasks = get_pending_tasks()
          if len(pending_tasks) == 0: return
          await asyncio.gather(*pending_tasks)

  loop = asyncio.new_event_loop()
  run_main_coro = run_main()
  run_main_task = loop.create_task(run_main_coro)
  loop.run_until_complete(run_main_task)

# asyncio.run(main()) # doesn't print from fn task, because main finishes earlier
async_run_and_await_all_tasks(main)

output (as expected):

main start
main end
fn

That async_run_and_await_all_tasks function will make python to behave in a nodejs manner: exit only when there are no unfinished tasks.

Katar answered 25/8, 2019 at 7:56 Comment(0)
S
2

You might also consider using asyncio.shield, although by doing this way you won't get ALL the running tasks finished but only shielded. But it still can be useful in some scenarios.

Besides that, as of Python 3.7 we also can use the high-level API method asynio.run here. As Python core developer, Yury Selivanov suggests: https://youtu.be/ReXxO_azV-w?t=636
Note: asyncio.run function has been added to asyncio in Python 3.7 on a provisional basis.

Hope that helps!

import asyncio


async def my_expensive_operation(expense):
    print(await asyncio.sleep(expense, result="Expensive operation finished."))


async def do_something_periodically(expense, interval):
    while True:
        asyncio.create_task(my_expensive_operation(expense))
        # using asyncio.shield
        await asyncio.shield(asyncio.sleep(interval))


coro = do_something_periodically(1, 1)

if __name__ == "__main__":
    try:
        # using asyncio.run
        asyncio.run(coro)
    except KeyboardInterrupt:
        print('Cancelled!')
Symbolist answered 2/2, 2019 at 20:57 Comment(0)
P
1

If you want a clean way to await on all running tasks created within some local scope without leaking memory (and while preventing garbage collection errors), you can maintain a set of running tasks and use task.add_done_callback(...) to remove the task from the set. Here is a class that handles this for you:

class TaskSet:
    def __init__(self):
        self.tasks = set()

    def add(self, coroutine: Coroutine) -> Task:
        task = asyncio.create_task(coroutine)
        self.tasks.add(task)
        task.add_done_callback(lambda _: self.tasks.remove(task))
        return task

    def __await__(self):
        return asyncio.gather(*self.tasks).__await__()

Which can be used like this:

async def my_function():
    await asyncio.sleep(0.5)


async def go():
    tasks = TaskSet()
    for i in range(10):
        tasks.add(my_function())
    await tasks
Plotter answered 4/3, 2021 at 9:59 Comment(0)
N
1
import asyncio

async def coroutine_to_run(timetosleepinseconds):
    print(await asyncio.sleep(timetosleepinseconds, result=f"I have finished in {timetosleepinseconds} seconds"))
    ## Do your stuff

async def main():
    tasks = [asyncio.create_task(coroutine_to_run(timetosleepinseconds=2)), asyncio.create_task(coroutine_to_run(timetosleepinseconds=3))]
    await asyncio.gather(*tasks)

asyncio.run(main())
Newspaper answered 3/8, 2022 at 18:36 Comment(0)
I
0

My use case has some main tasks that spawn short-lived tasks. This answer nicely exits immediately on seeing the main tasks finish (and some transient tasks as well) however I wanted a tidy up for other tasks. A time delay wouldn't work (as additional tasks may be created) so actively using .cancel() seemed the right choice.

Code is:

import asyncio

MAX_TASKS = 10
task_maker_count = 0

async def task_maker():
    global task_maker_count
    task_maker_count += 1
    if len(asyncio.all_tasks()) < MAX_TASKS:
        asyncio.create_task(task_maker())
        asyncio.create_task(task_maker())

async def main_task():
    asyncio.create_task(task_maker())
    await asyncio.sleep(2.0)

async def main():
    global task_maker_count
    asyncio.create_task(main_task())
    asyncio.create_task(main_task())

Test

    await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()})
    for task in [*asyncio.all_tasks() - {asyncio.current_task()}]:
        task.cancel()
    await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()},
                         return_exceptions=True)  # needed for CancelledError
    print(f'{task_maker_count} task_maker tasks created')

if __name__ == '__main__':
    asyncio.run(main())

Result on my computer is:

194672 task_maker tasks created

Not specifically relevant however bumping MAX_TASKS to the thousands dramatically reduces the number of tasks completed.

Interstratify answered 25/6, 2022 at 1:50 Comment(0)
T
0

Inspired by @matthew above, i wrote a similar class using task groups

class TaskRunner:
    tasks = []

    @classmethod
    def add(cls, coroutine):
        cls.tasks.append(coroutine)

    @classmethod
    async def run(cls):
        async with asyncio.TaskGroup() as tg:
            for task in cls.tasks:
                tg.create_task(task)

Usage:

TaskRunner.add(my_coroutine_function())
TaskRunner.add(my_coroutine_function())
TaskRunner.add(my_coroutine_function())

asyncio.run(TaskRunner.run())
Tapp answered 12/3, 2024 at 15:26 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.