What you're asking for is possible, but not trivial. First, note that you can never have suspends on every await
, but only on those that result in suspension of the coroutine, such as asyncio.sleep()
, or a stream.read()
that doesn't have data ready to return. Awaiting a coroutine immediately starts executing it, and if the coroutine can return immediately, it does so without dropping to the event loop. await
only suspends to the event loop if the awaitee (or its awaitee, etc.) requests it. More details in these questions: [1], [2], [3], [4].
With that in mind, you can use the technique from this answer to intercept each resumption of the coroutine with additional code that checks whether the task is paused and, if so, waits for the resume event before proceeding.
import asyncio
class Suspendable:
def __init__(self, target):
self._target = target
self._can_run = asyncio.Event()
self._can_run.set()
self._task = asyncio.ensure_future(self)
def __await__(self):
target_iter = self._target.__await__()
iter_send, iter_throw = target_iter.send, target_iter.throw
send, message = iter_send, None
# This "while" emulates yield from.
while True:
# wait for can_run before resuming execution of self._target
try:
while not self._can_run.is_set():
yield from self._can_run.wait().__await__()
except BaseException as err:
send, message = iter_throw, err
# continue with our regular program
try:
signal = send(message)
except StopIteration as err:
return err.value
else:
send = iter_send
try:
message = yield signal
except BaseException as err:
send, message = iter_throw, err
def suspend(self):
self._can_run.clear()
def is_suspended(self):
return not self._can_run.is_set()
def resume(self):
self._can_run.set()
def get_task(self):
return self._task
Test:
import time
async def heartbeat():
while True:
print(time.time())
await asyncio.sleep(.2)
async def main():
task = Suspendable(heartbeat())
for i in range(5):
print('suspending')
task.suspend()
await asyncio.sleep(1)
print('resuming')
task.resume()
await asyncio.sleep(1)
asyncio.run(main())
sleep(0)
probably points to a flaw in how my implementation handles cancelation. (sleep(0)
is almost always a "code smell" in asyncio code.) Maybe you need a try/except CancalledError around the innerwhile
loop, and in case ofCancelledError
dosend, message = iter_throw, exception_instance
. That way a cancel that interruptsEvent.wait
will be correctly propagated to the coroutine. – Kordulaasyncio.sleep(0)
. However, in my first attempt at the minimal example I made the mistake toawait
the suspendable, resulting in aRuntimeError
since it was alreadyawait
ed inrun_wrapper
. I'm doing this in the actual application as well, so I'm guessing theRuntimeError
might have been swallowed by uvicorn but resulted in unexpected behaviour. – Frazeerun_wrapper
, which is in turn owned by the task.run_wrapper
is only needed becausecreate_task()
AFAIR requires an actual coroutine. Perhaps I could have passed the suspendable directly toensure_future()
, but I didn't feel like experimenting, the code was involved enough as it was. – Kordulacancel()
that arrives whileSuspendable.__await__
is suspended inyield from self._can_run.wait().__await__()
. I think the cancellation will be injected into theEvent.wait
and theyield from
will raise, so the cancellation will never be propagated to the target coroutine. I thought that was why you neededsleep(0)
following resume, to ensure that the cancellation occurs afterEvent.wait
is done. That should be fixed by the change mentioned in the previous comment, after whichsleep(0)
would no longer be needed. – KordulaCancelledError
was not raised inside the coro. The exception is in fact raised at the yield from and can be caught with another try/except as you suggested. I will update the code above again to reflect these changes. With this implementation I was able to cancel the task without any addtionalasyncio.sleep(0)
, suspended or not. – Frazeeensure_future
already has the equivalent of myrun_wrapper
, so it's not necessary. Also, I added anis_suspended()
method, but I refrained from adding the task-manipulating methods, becauseget_task()
should be sufficient. (Cancellation should now "just work" with a simplex.get_task().cancel()
.) If that works for you, you can delete the copy of the solution from the question. – Kordulaasyncio.Event
while it is suspended, the__await__´ method is still at
message = yield signal, hence it never hits the
yield from. If the task is cancelled in this state the
CancelledError` is raised on the yield and then doesn't make it over theyield from
since_can_run
is still unset. This results in some sort of deadlock. I changed the while to 'while not self._can_run.is_set() and not isinstance(message, BaseException):'. – Frazee_can_run
in the exception handlers, but this would change the event state and maybe the coro wants to reject the cancellation and would then be unwillingly resumed. – Frazeewhile send is not iter_throw and not self._can_run.is_set()
, but that's equivalent to your formulation in asyncio because the event loop will resume us either with aNone
message or by delivering aCancelledError
exception. – Kordula