Is it possible to suspend and restart tasks in async Python?
Asked Answered
F

1

4

The question should be simple enough, but I couldn't find anything about it.

I have an async Python program that contains a rather long-running task that I want to be able to suspend and restart at arbitrary points (arbitrary of course meaning everywhere where there's an await keyword).

I was hoping there was something along the lines of task.suspend() and task.resume() but it seems there isn't.

Is there an API for this on task- or event-loop-level or would I need to do this myself somehow? I don't want to place an event.wait() before every await...

Frazee answered 18/3, 2021 at 8:43 Comment(13)
I think the need for explicit sleep(0) probably points to a flaw in how my implementation handles cancelation. (sleep(0) is almost always a "code smell" in asyncio code.) Maybe you need a try/except CancalledError around the inner while loop, and in case of CancelledError do send, message = iter_throw, exception_instance. That way a cancel that interrupts Event.wait will be correctly propagated to the coroutine.Kordula
Hmm, I think your implementation is fine. I made a minimal example from the asyncio-docs for task cancellation using your code and everything works as expected without using asyncio.sleep(0). However, in my first attempt at the minimal example I made the mistake to await the suspendable, resulting in a RuntimeError since it was already awaited in run_wrapper. I'm doing this in the actual application as well, so I'm guessing the RuntimeError might have been swallowed by uvicorn but resulted in unexpected behaviour.Frazee
Right, awaiting the suspendable is not allowed because its ownership is taken over by run_wrapper, which is in turn owned by the task. run_wrapper is only needed because create_task() AFAIR requires an actual coroutine. Perhaps I could have passed the suspendable directly to ensure_future(), but I didn't feel like experimenting, the code was involved enough as it was.Kordula
But I'm still a bit worried about a cancel() that arrives while Suspendable.__await__ is suspended in yield from self._can_run.wait().__await__(). I think the cancellation will be injected into the Event.wait and the yield from will raise, so the cancellation will never be propagated to the target coroutine. I thought that was why you needed sleep(0) following resume, to ensure that the cancellation occurs after Event.wait is done. That should be fixed by the change mentioned in the previous comment, after which sleep(0) would no longer be needed.Kordula
You were right to be worried :). I repeated the test with the minimal example and I overlooked that while the task got cancelled when it was suspended, the CancelledError was not raised inside the coro. The exception is in fact raised at the yield from and can be caught with another try/except as you suggested. I will update the code above again to reflect these changes. With this implementation I was able to cancel the task without any addtional asyncio.sleep(0), suspended or not.Frazee
Thanks for pursuing this. I've now incorporated your improvements in my answer. It turns out ensure_future already has the equivalent of my run_wrapper, so it's not necessary. Also, I added an is_suspended() method, but I refrained from adding the task-manipulating methods, because get_task() should be sufficient. (Cancellation should now "just work" with a simple x.get_task().cancel().) If that works for you, you can delete the copy of the solution from the question.Kordula
Sure, will do. Thanks for all your help!Frazee
Thanks for asking an interesting question! I'm pretty sure similar questions were asked before (so there is interest in that functionality), and have gone unanswered because the solution is not exactly trivial.Kordula
Yes, I was actually surprised that I didn't find anything 'readymade'. I feels only natural to be able to suspend a task whose literal purpose it is to be suspended! I guess if you let the user do it, there's some risk of having a lot of suspended tasks in the loop, which can be a performance issue.Frazee
One possible issue is that the additional layer of suspension comes without cooperation from the task, which might call issues if the coroutine expects to be resumed soon after an event. For example, imagine a coroutine waiting for a socket to become readable in order to read from it. If it gets suspended during the wait, its asyncio task might keep getting woken on up every event loop iteration because AFAIK the event loop polling of file descriptors is level-triggered. That might cause the event loop to never go to sleep and to remain in a kind of busy-loop until the task is force-resumed.Kordula
Speaking of issues, I think I just stumbled on another one... It seems when the target coroutine is waiting on some asyncio.Event while it is suspended, the __await__´ method is still at message = yield signal, hence it never hits the yield from. If the task is cancelled in this state the CancelledError` is raised on the yield and then doesn't make it over the yield from since _can_run is still unset. This results in some sort of deadlock. I changed the while to 'while not self._can_run.is_set() and not isinstance(message, BaseException):'.Frazee
This works, but it feels kind of hacky. One could also set _can_run in the exception handlers, but this would change the event state and maybe the coro wants to reject the cancellation and would then be unwillingly resumed.Frazee
The question is what you want to happen when a suspended task is cancelled. My implementation takes suspension seriously and waits for the to be resumed before delivering the cancellation. (I'm not sure how a deadlock occurs in your usage.) I think it's ok to change the code the way you did, if that's the semantics you need. I might have written the loop condition as while send is not iter_throw and not self._can_run.is_set(), but that's equivalent to your formulation in asyncio because the event loop will resume us either with a None message or by delivering a CancelledError exception.Kordula
K
4

What you're asking for is possible, but not trivial. First, note that you can never have suspends on every await, but only on those that result in suspension of the coroutine, such as asyncio.sleep(), or a stream.read() that doesn't have data ready to return. Awaiting a coroutine immediately starts executing it, and if the coroutine can return immediately, it does so without dropping to the event loop. await only suspends to the event loop if the awaitee (or its awaitee, etc.) requests it. More details in these questions: [1], [2], [3], [4].

With that in mind, you can use the technique from this answer to intercept each resumption of the coroutine with additional code that checks whether the task is paused and, if so, waits for the resume event before proceeding.

import asyncio

class Suspendable:
    def __init__(self, target):
        self._target = target
        self._can_run = asyncio.Event()
        self._can_run.set()
        self._task = asyncio.ensure_future(self)

    def __await__(self):
        target_iter = self._target.__await__()
        iter_send, iter_throw = target_iter.send, target_iter.throw
        send, message = iter_send, None
        # This "while" emulates yield from.
        while True:
            # wait for can_run before resuming execution of self._target
            try:
                while not self._can_run.is_set():
                    yield from self._can_run.wait().__await__()
            except BaseException as err:
                send, message = iter_throw, err

            # continue with our regular program
            try:
                signal = send(message)
            except StopIteration as err:
                return err.value
            else:
                send = iter_send
            try:
                message = yield signal
            except BaseException as err:
                send, message = iter_throw, err

    def suspend(self):
        self._can_run.clear()

    def is_suspended(self):
        return not self._can_run.is_set()

    def resume(self):
        self._can_run.set()

    def get_task(self):
        return self._task

Test:

import time

async def heartbeat():
    while True:
        print(time.time())
        await asyncio.sleep(.2)

async def main():
    task = Suspendable(heartbeat())
    for i in range(5):
        print('suspending')
        task.suspend()
        await asyncio.sleep(1)
        print('resuming')
        task.resume()
        await asyncio.sleep(1)

asyncio.run(main())
Kordula answered 18/3, 2021 at 16:53 Comment(5)
Thanks for this great solution and the additional links and clarifications! I will edit my post to reflect my final code.Frazee
Not sure if you need self._task = asyncio.ensure_future(self). In my case it caused __await__ to be called twice, leading to a "RuntimeError: cannot reuse already awaited coroutine" at yield signal. But without self._task this code saved my day. :)Circosta
@Circosta Can you provide a pastebin (or similar) with the test code that leads to that exception? I'd like to fix the code, but the test code in the answer doesn't raise.Kordula
@Kordula Oh I see, you don't await the task. I was trying something like async def main(): await Suspendable(heartbeat()). Here is a more complete example: pastebin.com/KCiLLtrHCircosta
@Circosta Thanks for the example. I think the idea is that you don't await the task, but run it in the background, only occasionally suspending/resuming it. The __await__() method is an implementation detail where I placed the interception code, which is confusing here because it makes it look like Suspendable is awaitable. If one were to remove ensure_future(), then the task wouldn't automatically start. But there are probably other ways to start it, so your modification is likely correct.Kordula

© 2022 - 2024 — McMap. All rights reserved.