Python asyncio.create_task() - really need to keep a reference?
Asked Answered
T

3

22

The documentation of asyncio.create_task() states the following warning:

Important: Save a reference to the result of this function, to avoid a task disappearing mid execution. (source)

My question is: Is this really true?

I have several IO bound "fire and forget" tasks which I want to run concurrently using asyncio by submitting them to the event loop using asyncio.create_task(). However, I do not really care for the return value of the coroutine or even if they run successfully, only that they do run eventually. One use case is writing data from an "expensive" calculation back to a Redis data base. If Redis is available, great. If not, oh well, no harm. This is why I do not want/need to await those tasks.

Here a generic example:

import asyncio

async def fire_and_forget_coro():
    """Some random coroutine waiting for IO to complete."""
    print('in fire_and_forget_coro()')
    await asyncio.sleep(1.0)
    print('fire_and_forget_coro() done')


async def async_main():
    """Main entry point of asyncio application."""
    print('in async_main()')
    n = 3
    for _ in range(n):
        # create_task() does not block, returns immediately.
        # Note: We do NOT save a reference to the submitted task here!
        asyncio.create_task(fire_and_forget_coro(), name='fire_and_forget_coro')

    print('awaiting sleep in async_main()')
    await asyncio.sleep(2.0) # <-- note this line
    print('sleeping done in async_main()')

    print('async_main() done.')

    # all references of tasks we *might* have go out of scope when returning from this coroutine!
    return

if __name__ == '__main__':
    asyncio.run(async_main())

Output:

in async_main()
awaiting sleep in async_main()
in fire_and_forget_coro()
in fire_and_forget_coro()
in fire_and_forget_coro()
fire_and_forget_coro() done
fire_and_forget_coro() done
fire_and_forget_coro() done
sleeping done in async_main()
async_main() done.

When commenting out the await asyncio.sleep() line, we never see fire_and_forget_coro() finish. This is to be expected: When the event loop started with asyncio.run() closes, tasks will not be executed anymore. But it appears that as long as the event loop is still running, all tasks will be taken care of, even when I never explicitly created references to them. This seem logical to me, as the event loop itself must have a reference to all scheduled tasks in order to run them. And we can even get them all using asyncio.all_tasks()!

So, I think I can trust Python to have at least one strong reference to every scheduled tasks as long as the event loop it was submitted to is still running, and thus I do not have to manage references myself. But I would like a second opinion here. Am I right or are there pitfalls I have not yet recognized?

If I am right, why the explicit warning in the documentation? It is a usual Python thing that stuff is garbage-collected if you do not keep a reference to it. Are there situations where one does not have a running event loop but still some task objects to reference? Maybe when creating an event loop manually (never did this)?

Tad answered 20/4, 2022 at 11:25 Comment(0)
T
19

There is an open issue at the cpython bug tracker at github about this topic I just found: https://github.com/python/cpython/issues/88831

Quote:

asyncio will only keep weak references to alive tasks (in _all_tasks). If a user does not keep a reference to a task and the task is not currently executing or sleeping, the user may get "Task was destroyed but it is pending!".

So the answer to my question is, unfortunately, yes. One has to keep around a reference to the scheduled task.

However, the github issue also describes a relatively simple workaround: Keep all running tasks in a set() and add a callback to the task which removes itself from the set() again.

running_tasks = set()
# [...]
task = asyncio.create_task(some_background_function())
running_tasks.add(task)
task.add_done_callback(lambda t: running_tasks.remove(t))
Tad answered 21/4, 2022 at 15:1 Comment(4)
I've read through this gh issue, your pull request, a discussion on hackernews, and I still cannot manage to understand that. For me, your reasoning in the question makes much sense, but not the docs and the discussion in the original PR. Still no examples how to reproduce that. For now, I am going to believe that you used to be correct, and the person who wrote the original statement in the docs just overestimated the seriousness of the problemLewd
@AndriiMaletskyi: You are right, the fact that Python could garbage-collect running tasks never exploded in my face in practice. I think this is due to the fact that most of my "async fire-and-forget tasks" are short lived and finished when the garbage collector gets around to clean them up. However, I am a big fan of "defensive programming": Never assume that something will work as you expect if the documentation doesn't explicitly say so and a wrong assumption might explode in your face at the least convenient time. :-) This is why I started researching his subject in the first place.Tad
Just a small style improvement for the last line: task.add_done_callback(running_tasks.remove)Ellery
"... does not keep a reference to a task and the task is not currently executing or sleeping" What actually would these situations be? "executing or sleeping" should cover most situations already. If one excludes "waiting on synchronisation" from sleeping, then a) asyncio synchronisation primitives do keep references to waiters and b) if asyncio synchronisation primitive is garbage collected it cannot wake up its waiters anyways. So the only situation in which a task should be garbage collected is when it cannot wake up ever again...Gezira
O
11

As I realized more and more people(including me) are struggling to understand "why" they need to keep references to the tasks as their code already works just fine, I intended to explain what's going on behind the scene and give more information about the references in different steps and show when their code works, when it doesn't.

If you follow along, my interpreter version is 3.11.

Let's start by creating a task. When we pass a coroutine to asyncio.create_task() function, it creates a Task object (here):

...
            task = tasks.Task(coro, loop=self, name=name, context=context)
...

In Task's initializer, we have this line (here):

...
        self._loop.call_soon(self.__step, context=self._context)
        _register_task(self)
...

The second line registers the task into a WeakSet()(a weak reference to the task). This is not taken into account for reference counting mechanism.

You see that self.__step? It's a callback. It's actually a method which has a reference to the newly created Task object. We pass it to the call_soon() method, which schedules that callback(through Handle object) for execution. (here):

...
        handle = events.Handle(callback, args, self, context)
        ...
        self._ready.append(handle)
...

What is self._ready? It's the final queue which event loop gets callbacks from:(here):

...
            handle = self._ready.popleft()
            ...
            else:
                handle._run()
...

So up until now, before our callback pops off from the queue, we have this relationship for the task's reference:

eventloop -> self._ready -> Handle -> callback -> task object

There is a strong reference to our task object which prevents it from being garbage collected.

So far so good. What happens when one cycle of the event loop runs? We no longer have our callback in self._ready, is the only strong reference now gone? wait here...

Usually we wait on an awaitable object inside our task - A Future object most of the time that is returned from an IO call. When our callback runs, it yields a Future object: (here)

...
                result = coro.send(None)
...

And asyncio receives the Future and adds a callback to it's "done callbacks": (here):

                        result.add_done_callback(
                            self.__wakeup, context=self._context)

Again that callback/method (self.__weakeup) has a reference to the Task object. Now here is the most important part in this post:

The yielded Future object has a reference to the Task object. But can it survive itself? What are the references to the Future object itself? As long as it has strong references, our Task can live without any problem, otherwise our task is also going to be garbage collected.

I'm going to show three scenarios to see that in action.

1. Future object can't survive:

Suppose we create the Future object inside our Task:

import asyncio
import gc


async def coro1():
    while True:
        print("just printing...")
        await asyncio.sleep(1)
        gc.collect()

async def coro2():
    loop = asyncio.get_running_loop()
    f = loop.create_future()
    print("inside coro2 - going to wait for future")
    await f
    print("inside coro2 - future resolved")

async def main():
    t1 = asyncio.create_task(coro1()) # This task has a reference.
    asyncio.create_task(coro2())      # This task doesn't.
    await asyncio.sleep(5)

asyncio.run(main())

output:

just printing...
inside coro2 - going to wait for future
Task was destroyed but it is pending!
task: <Task pending name='Task-3' coro=<coro2() done, defined at ...> wait_for=<Future pending cb=[Task.task_wakeup()]>>
just printing...
just printing...
just printing...
just printing...

What happened? As I mentioned above, asyncio received the Future object and added self.__wakeup to its "done callbacks", but what is the only reference to this Future object? it's only referenced inside our Task! There is a cyclic reference here between Task and Future object, and there is no strong reference to the Task object(because at this stage, asyncio popped off the callback from self._ready queue to execute it). After calling gc.collect() Python noticed this cyclic reference and deletes our Task.

2. Future object can survive:

I'm going to add just a single line in coro2() coroutine -> making the f a global variable:

async def coro2():
    global f    # <---------------
    loop = asyncio.get_running_loop()
    f = loop.create_future()
    print("inside coro2 - going to wait for future")
    await f
    print("inside coro2 - future resolved")

output:

just printing...
inside coro2 - going to wait for future
just printing...
just printing...
just printing...
just printing...

Now there "is" a strong reference to the Future object. Here is the relationship:

Global namespace -> Future object -> self._callbacks -> callback -> Task object

3. Future object can survive(real world example):

Usually we don't deal with creating Future objects ourselves. Suppose we have a simple echo server which listens for incoming connections asynchronously:

import asyncio
import socket
import gc


async def echo(connection, loop):
    while data := await loop.sock_recv(connection, 512):
        gc.collect()
        await loop.sock_sendall(connection, data)


async def listen_for_connections(server_socket, loop):
    while True:
        gc.collect()
        client_socket, client_address = await loop.sock_accept(server_socket)
        client_socket.setblocking(False)
        print(f"received a connection from {client_address}")
        asyncio.create_task(echo(client_socket, loop))  # no reference to this task


async def main():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_address = ("127.0.0.1", 8000)
    server_socket.setblocking(False)
    server_socket.bind(server_address)
    server_socket.listen()

    await listen_for_connections(server_socket, asyncio.get_running_loop())


asyncio.run(main())

Now what will happen to our echo task?

The line await loop.sock_recv(connection, 512), creates a Future object: (here)

async def sock_recv(self, sock, n):
    ...
    try:
        return sock.recv(n)
    except (BlockingIOError, InterruptedError):
        pass
    fut = self.create_future()
    fd = sock.fileno()
    handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
    fut.add_done_callback(functools.partial(self._sock_read_done, fd, handle=handle))
    return await fut

You see that await fut? it yields a Future object. Is that similar to the first scenario? Are there any other reference to the Future? Let's see what does self._add_reader() do: (here)

    def _add_reader(self, fd, callback, *args):
        ...
        handle = events.Handle(callback, args, self, None)
        ...
        self._selector.register(fd, selectors.EVENT_READ,(handle, None))
        ...
        return handle

Cool. So our fut object is stored in args parameter and it's registered as the data into the selector.

The relationship is:

selector -> handle -> args -> fut -> task object.

I tried to explain in which cases the task is going to get garbage collected and in which cases they can live, but after-all, I highly recommend to pay attention to what is highlighted in the documentation:

Important: Save a reference to the result of this function, to avoid a task disappearing mid-execution. The event loop only keeps weak references to tasks. A task that isn’t referenced elsewhere may get garbage collected at any time, even before it’s done.


At this point a valid question is what @MisterMiyagi asked in the comment:

In the only example where the task is collected (namely when the future was collected as well) the task could not have resumed, ever (because that would need completing the future which is impossible after collection)!

Indeed I found it hard to reproduce an example to demonstrate the issue until I reached to this link. Here is a reproducible example:

import asyncio
import gc

async def background():
    remote_read, remote_write = await asyncio.open_connection("example.com", 443, ssl=False)
    await remote_read.read()

async def cleanup():
    while True:
        gc.collect()
        await asyncio.sleep(3)

async def main():
    asyncio.create_task(background())
    await cleanup()

asyncio.run(main())

If you dig into this line await remote_read.read() and follow the same steps that we did earlier, you would reach to this point (here):

        ...
        self._waiter = self._loop.create_future()
        try:
            await self._waiter
        finally:
            ...

self._waiter is an instance variable holding a strong reference to the yielded Future, BUT, the reference to that instance (remote_reader) itself is inside our background() coroutine. Again circular reference.

A question in comment:

can you explain why changing await f to await asyncio.sleep(999) prevents this coroutine from deleting in the middle of waiting?

The answer is because there has to be someone who has a strong reference to the task. Let's find it:

Here is a body of the asyncio.sleep function:

async def sleep(delay, result=None):
    """Coroutine that completes after a given time (in seconds)."""
    if delay <= 0:
        await __sleep0()
        return result

    ### Reaching this line means the `delay` is a positive integer

    loop = events.get_running_loop()
    future = loop.create_future()
    h = loop.call_later(delay,        # <------------
                        futures._set_result_unless_cancelled,
                        future, result)
    try:
        return await future
    finally:
        h.cancel()

Similar pattern no? It creates a future object and awaits it. BUT it also register it somewhere with loop.call_later():

    def call_later(self, delay, callback, *args, context=None):
        ...
        timer = self.call_at(self.time() + delay, callback, *args,
                             context=context)
        ...
        return timer

and inside call_at(), you see this:

    def call_at(self, when, callback, *args, context=None):
        ...
        timer = events.TimerHandle(when, callback, args, self, context)
        ...
        heapq.heappush(self._scheduled, timer)
        ...
        return timer

Yes you found another queue self._scheduled for the scheduled callbacks by time measure. Asyncio observes this queue too to see when it should execute a particular callback. call_later() api is a freely available interface if you want to use it in your code.

Oscillatory answered 2/8, 2023 at 21:31 Comment(7)
So if nobody holds references to the future obj yielded by task, then the task obj is in danger. But if nobody holds references to the future, it also means that nobody can resolve it, meaning that your code already contains errors and doesn't work, right?Lewd
@AndriiMaletskyi Yes. I think one realistic example that causes problem is that someone may hold a reference to the yielded future for some time, then later it decides to get rid of it(or maybe not deliberately), now it's the matter of time before gc kicks in. Mostly I wanted to explain this relationship. (there is another path in which the task yields a None, in that case the callback is registered back to the event loop and it can survive again.)Oscillatory
Thanks a lot for a great article! But can you explain why changing await f to await asyncio.sleep(999) prevents this coroutine from deleting in the middle of waiting?Calkins
I don't understand what this answer is supposed to say in practical terms. In the only example where the task is collected (namely when the future was collected as well= the task could not have resumed, ever (because that would need completing the future which is impossible after collection)! So by all rights that task should be collected and if we prevent that via strong references we actually introduce a memory leak...Gezira
@Gezira This answer was mostly trying to explain what causes the tasks to be alive even without storing a strong reference to them. I've found a reproducible example you looking for here: github.com/python/cpython/issues/91887#issuecomment-1920700673Oscillatory
@Oscillatory Thanks, that's a good fine. FWIW, I would consider both the .shield case and the .read case proper examples. Consider to even just quote them in your answer as examples, I think this would improve things.Gezira
@Gezira Sure. The example is added.Oscillatory
R
0

In python3.11, there is a new API asyncio.TaskGroup.create_task.
It do the things that the other answer have mentioned, so you don't need to do it yourself.

Regression answered 6/11, 2022 at 14:30 Comment(2)
Thanks for the hint. However, as far as I understand the docs, you are not completely "off the hook" by using TaskGroups. Yes, the groups waits for all tasks to finish. But using "async with" would block your program flow at this point, no? It would make sense to me to create one global TaskGroup and run every task within this. Hmm... one of these days I really have to take a closer look on the new TaskGroup feature to wrap my head around this. :-)Tad
This doesn't really answer the question. A TaskGroup is just another way to keep a strong reference, it doesn't have any impact on whether one needs to do so in the first place.Gezira

© 2022 - 2025 — McMap. All rights reserved.