As I realized more and more people(including me) are struggling to understand "why" they need to keep references to the tasks as their code already works just fine, I intended to explain what's going on behind the scene and give more information about the references in different steps and show when their code works, when it doesn't.
If you follow along, my interpreter version is 3.11.
Let's start by creating a task. When we pass a coroutine to asyncio.create_task()
function, it creates a Task
object (here):
...
task = tasks.Task(coro, loop=self, name=name, context=context)
...
In Task
's initializer, we have this line (here):
...
self._loop.call_soon(self.__step, context=self._context)
_register_task(self)
...
The second line registers the task into a WeakSet()
(a weak reference to the task). This is not taken into account for reference counting mechanism.
You see that self.__step
? It's a callback. It's actually a method which has a reference to the newly created Task
object. We pass it to the call_soon()
method, which schedules that callback(through Handle
object) for execution. (here):
...
handle = events.Handle(callback, args, self, context)
...
self._ready.append(handle)
...
What is self._ready
? It's the final queue which event loop gets callbacks from:(here):
...
handle = self._ready.popleft()
...
else:
handle._run()
...
So up until now, before our callback pops off from the queue, we have this relationship for the task's reference:
eventloop -> self._ready -> Handle -> callback -> task object
There is a strong reference to our task object which prevents it from being garbage collected.
So far so good. What happens when one cycle of the event loop runs? We no longer have our callback in self._ready
, is the only strong reference now gone? wait here...
Usually we wait on an awaitable object inside our task - A Future
object most of the time that is returned from an IO call. When our callback runs, it yields a Future
object: (here)
...
result = coro.send(None)
...
And asyncio receives the Future and adds a callback to it's "done callbacks": (here):
result.add_done_callback(
self.__wakeup, context=self._context)
Again that callback/method (self.__weakeup
) has a reference to the Task object. Now here is the most important part in this post:
The yielded Future
object has a reference to the Task
object. But can it survive itself? What are the references to the Future
object itself? As long as it has strong references, our Task
can live without any problem, otherwise our task is also going to be garbage collected.
I'm going to show three scenarios to see that in action.
1. Future object can't survive:
Suppose we create the Future
object inside our Task:
import asyncio
import gc
async def coro1():
while True:
print("just printing...")
await asyncio.sleep(1)
gc.collect()
async def coro2():
loop = asyncio.get_running_loop()
f = loop.create_future()
print("inside coro2 - going to wait for future")
await f
print("inside coro2 - future resolved")
async def main():
t1 = asyncio.create_task(coro1()) # This task has a reference.
asyncio.create_task(coro2()) # This task doesn't.
await asyncio.sleep(5)
asyncio.run(main())
output:
just printing...
inside coro2 - going to wait for future
Task was destroyed but it is pending!
task: <Task pending name='Task-3' coro=<coro2() done, defined at ...> wait_for=<Future pending cb=[Task.task_wakeup()]>>
just printing...
just printing...
just printing...
just printing...
What happened? As I mentioned above, asyncio received the Future object and added self.__wakeup
to its "done callbacks", but what is the only reference to this Future object? it's only referenced inside our Task! There is a cyclic reference here between Task and Future object, and there is no strong reference to the Task object(because at this stage, asyncio popped off the callback from self._ready
queue to execute it). After calling gc.collect()
Python noticed this cyclic reference and deletes our Task.
2. Future object can survive:
I'm going to add just a single line in coro2()
coroutine -> making the f
a global variable:
async def coro2():
global f # <---------------
loop = asyncio.get_running_loop()
f = loop.create_future()
print("inside coro2 - going to wait for future")
await f
print("inside coro2 - future resolved")
output:
just printing...
inside coro2 - going to wait for future
just printing...
just printing...
just printing...
just printing...
Now there "is" a strong reference to the Future object. Here is the relationship:
Global namespace -> Future object -> self._callbacks -> callback -> Task object
3. Future object can survive(real world example):
Usually we don't deal with creating Future objects ourselves. Suppose we have a simple echo server which listens for incoming connections asynchronously:
import asyncio
import socket
import gc
async def echo(connection, loop):
while data := await loop.sock_recv(connection, 512):
gc.collect()
await loop.sock_sendall(connection, data)
async def listen_for_connections(server_socket, loop):
while True:
gc.collect()
client_socket, client_address = await loop.sock_accept(server_socket)
client_socket.setblocking(False)
print(f"received a connection from {client_address}")
asyncio.create_task(echo(client_socket, loop)) # no reference to this task
async def main():
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = ("127.0.0.1", 8000)
server_socket.setblocking(False)
server_socket.bind(server_address)
server_socket.listen()
await listen_for_connections(server_socket, asyncio.get_running_loop())
asyncio.run(main())
Now what will happen to our echo
task?
The line await loop.sock_recv(connection, 512)
, creates a Future object: (here)
async def sock_recv(self, sock, n):
...
try:
return sock.recv(n)
except (BlockingIOError, InterruptedError):
pass
fut = self.create_future()
fd = sock.fileno()
handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
fut.add_done_callback(functools.partial(self._sock_read_done, fd, handle=handle))
return await fut
You see that await fut
? it yields a Future object. Is that similar to the first scenario? Are there any other reference to the Future? Let's see what does self._add_reader()
do: (here)
def _add_reader(self, fd, callback, *args):
...
handle = events.Handle(callback, args, self, None)
...
self._selector.register(fd, selectors.EVENT_READ,(handle, None))
...
return handle
Cool. So our fut
object is stored in args
parameter and it's registered as the data into the selector.
The relationship is:
selector -> handle -> args -> fut -> task object.
I tried to explain in which cases the task is going to get garbage collected and in which cases they can live, but after-all, I highly recommend to pay attention to what is highlighted in the documentation:
Important: Save a reference to the result of this function, to avoid a task disappearing mid-execution. The event loop only keeps
weak references to tasks. A task that isn’t referenced elsewhere may
get garbage collected at any time, even before it’s done.
At this point a valid question is what @MisterMiyagi asked in the comment:
In the only example where the task is collected (namely when the future was collected as well) the task could not have resumed, ever (because that would need completing the future which is impossible after collection)!
Indeed I found it hard to reproduce an example to demonstrate the issue until I reached to this link. Here is a reproducible example:
import asyncio
import gc
async def background():
remote_read, remote_write = await asyncio.open_connection("example.com", 443, ssl=False)
await remote_read.read()
async def cleanup():
while True:
gc.collect()
await asyncio.sleep(3)
async def main():
asyncio.create_task(background())
await cleanup()
asyncio.run(main())
If you dig into this line await remote_read.read()
and follow the same steps that we did earlier, you would reach to this point (here):
...
self._waiter = self._loop.create_future()
try:
await self._waiter
finally:
...
self._waiter
is an instance variable holding a strong reference to the yielded Future, BUT, the reference to that instance (remote_reader
) itself is inside our background()
coroutine. Again circular reference.
A question in comment:
can you explain why changing await f
to await asyncio.sleep(999)
prevents this coroutine from deleting in the middle of waiting?
The answer is because there has to be someone who has a strong reference to the task. Let's find it:
Here is a body of the asyncio.sleep
function:
async def sleep(delay, result=None):
"""Coroutine that completes after a given time (in seconds)."""
if delay <= 0:
await __sleep0()
return result
### Reaching this line means the `delay` is a positive integer
loop = events.get_running_loop()
future = loop.create_future()
h = loop.call_later(delay, # <------------
futures._set_result_unless_cancelled,
future, result)
try:
return await future
finally:
h.cancel()
Similar pattern no? It creates a future object and await
s it. BUT it also register it somewhere with loop.call_later()
:
def call_later(self, delay, callback, *args, context=None):
...
timer = self.call_at(self.time() + delay, callback, *args,
context=context)
...
return timer
and inside call_at()
, you see this:
def call_at(self, when, callback, *args, context=None):
...
timer = events.TimerHandle(when, callback, args, self, context)
...
heapq.heappush(self._scheduled, timer)
...
return timer
Yes you found another queue self._scheduled
for the scheduled callbacks by time measure. Asyncio observes this queue too to see when it should execute a particular callback. call_later()
api is a freely available interface if you want to use it in your code.