Passing a coroutine to AbstractEventLoop.call_later
Asked Answered
D

3

10

The following is the code I'm trying to get working:

>>> import asyncio
>>> async def foo(loop, iv):
...     await asyncio.sleep(1, loop=loop)
...     print(f'done: {iv}')
...     
>>> loop = asyncio.get_event_loop()
>>> loop.call_later(2, foo, loop, 10)
<TimerHandle when=36395.554089349 foo(<_UnixSelecto...e debug=False>, 10) at <input>:1>
>>> loop.run_forever()

(Python 3.6)

Basically the foo() function has some chained async calls, so this method has to be async as there is a need to await for the chained calls. However this method is triggered after a delay, and when one runs this code, the following problem occurs:

/usr/lib64/python3.6/asyncio/events.py:127: RuntimeWarning: coroutine 'foo' was never awaited self._callback(*self._args)

What is the correct way to handle this async call in the call_later?

Deltoid answered 13/1, 2018 at 0:7 Comment(0)
W
17

call_later() only supports callbacks (regular functions); you simply can’t pass in a coroutine.

If you want to delay a coroutine, you have two options; either delay the coroutine by having it sleep at the start, or call asyncio.create_task() from call_later(), which does take a coroutine and schedules it to run.

use asyncio.sleep() at the start of the routine, you can have the loop execute it directly:

async def foo(iv):
    # delay start of the work
    await asyncio.sleep(2)

    # rest of your coroutine

You could easily use a wrapper coroutine to do this:

async def await_coro_later(delay, coro, *args, **kwargs):
    await asyncio.sleep(delay)
    await coro(*args, **kwargs)

If you use asyncio.create_task() (or, for Python 3.6 or older, asyncio.ensure_future(), you can pass that to call_later():

# create a task for foo(10) later
loop.call_later(2, asyncio.create_task, foo(10))

Demo of either technique:

>>> import asyncio
>>> async def foo(iv, start):
...     await asyncio.sleep(1)
...     offset = asyncio.get_running_loop().time() - start
...     print(f'done ({offset:.3f}s): {iv}')
...
>>> async def await_coro_later(delay, coro, *args, **kwargs):
...     await asyncio.sleep(delay)
...     await coro(*args, **kwargs)
...
>>> async def demo():
...     loop = asyncio.get_running_loop()
...     start = loop.time()
...     loop.call_later(2, asyncio.create_task, foo('cb_to_create_task', start))
...     await await_coro_later(5, foo, 'coro_later', start)
...
>>> asyncio.run(demo())
done (3.004s): cb_to_create_task
done (6.006s): coro_later
Wrand answered 13/1, 2018 at 0:12 Comment(0)
P
3

Small addition.

It seems that better way is to use lambda. Instead of use: loop.call_later(2, asyncio.create_task, foo('cb_to_create_task', start)) use loop.call_later(2, lambda: asyncio.create_task(foo('cb_to_create_task', start)))

It's avoid "RuntimeWarning: coroutine 'ResultCursor.finalize' was never awaited" if timeout cancellation is used and asyncio debugging is enabled.

Platon answered 10/12, 2022 at 8:3 Comment(0)
B
0

The better way I found is doing something like this:

import asyncio
import typing as t

def schedule_at(coro: t.Coroutine, when: float, loop: asyncio.AbstractEventLoop = None):
    event = asyncio.Event()
    loop = loop or asyncio.get_event_loop()
    loop.call_at(when, lambda: event.set())

    async def wrapper():
        await event.wait()
        return await coro

    return asyncio.create_task(wrapper())


def schedule_after(coro: t.Coroutine, delay: float, loop: asyncio.AbstractEventLoop = None):
    loop = loop or asyncio.get_event_loop()
    return schedule_at(coro, loop.time() + delay, loop)
Bot answered 14/5, 2024 at 12:34 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.