Python coroutines: Release context manager when pausing
Asked Answered
O

1

12

Background: I'm a very experienced Python programmer who is completely clueless about the new coroutines/async/await features. I can't write an async "hello world" to save my life.

My question is: I am given an arbitrary coroutine function f. I want to write a coroutine function g that will wrap f, i.e. I will give g to the user as if it was f, and the user will call it and be none the wiser, since g will be using f under the hood. Like when you decorate a normal Python function to add functionality.

The functionality that I want to add: Whenever the program flow goes into my coroutine, it acquires a context manager that I provide, and as soon as program flow goes out of the coroutine, it releases that context manager. Flow comes back in? Re-acquire the context manager. It goes back out? Re-release it. Until the coroutine is completely finished.

To demonstrate, here is the described functionality with plain generators:

def generator_wrapper(_, *args, **kwargs):
    gen = function(*args, **kwargs)
    method, incoming = gen.send, None
    while True:
        with self:
            outgoing = method(incoming)
        try:
            method, incoming = gen.send, (yield outgoing)
        except Exception as e:
            method, incoming = gen.throw, e

Is it possible to do it with coroutines?

Ode answered 10/5, 2019 at 13:14 Comment(3)
Can you please make your example reproducable? What is self and function? How do you call generator_wrapper and why does it have _ as first parameter?Lelialelith
If that's confusing, you can just ignore the example. The functionality I want is described by the paragraph above it.Ode
To learn about coroutines, async, await, and the event loop, I highly recommend watching this lecture by David Beazley. The lecturer creates a simple event loop from scratch in front of live audience, precisely showing the interplay between coroutines and the event loop. (Don't be put off by the lecture's use of the older yield from syntax, await is just a very thin syntactic sugar over yield from, as explained in MisterMiyagi's answer.)Asaasabi
L
12

Coroutines are built on iterators - the __await__ special method is a regular iterator. This allows you to wrap the underlying iterator in yet another iterator. The trick is that you must unwrap the iterator of your target using its __await__, then re-wrap your own iterator using your own __await__.

The core functionality that works on instantiated coroutines looks like this:

class CoroWrapper:
    """Wrap ``target`` to have every send issued in a ``context``"""
    def __init__(self, target: 'Coroutine', context: 'ContextManager'):
        self.target = target
        self.context = context

    # wrap an iterator for use with 'await'
    def __await__(self):
        # unwrap the underlying iterator
        target_iter = self.target.__await__()
        # emulate 'yield from'
        iter_send, iter_throw = target_iter.send, target_iter.throw
        send, message = iter_send, None
        while True:
            # communicate with the target coroutine
            try:
                with self.context:
                    signal = send(message)
            except StopIteration as err:
                return err.value
            # communicate with the ambient event loop
            # depending on the signal, we need a different send method
            try:
                message = yield signal
            except BaseException as err:
                send, message = iter_throw, err
            else:
                send = iter_send

Note that this explicitly works on a Coroutine, not an Awaitable - Coroutine.__await__ implements the generator interface. In theory, an Awaitable does not necessarily provide __await__().send or __await__().throw.

This is enough to pass messages in and out:

import asyncio


class PrintContext:
    def __enter__(self):
        print('enter')

    def __exit__(self, exc_type, exc_val, exc_tb):
        print('exit via', exc_type)
        return False


async def main_coro():
    print(
        'wrapper returned',
        await CoroWrapper(test_coro(), PrintContext())
    )


async def test_coro(delay=0.5):
    await asyncio.sleep(delay)
    return 2

asyncio.run(main_coro())
# enter
# exit via None
# enter
# exit <class 'StopIteration'>
# wrapper returned 2

You can delegate the wrapping part to a separate decorator. This also ensures that you have an actual coroutine, not a custom class - some async libraries require this.

from functools import wraps


def send_context(context: 'ContextManager'):
    """Wrap a coroutine to issue every send in a context"""
    def coro_wrapper(target: 'Callable[..., Coroutine]') -> 'Callable[..., Coroutine]':
        @wraps(target)
        async def context_coroutine(*args, **kwargs):
            return await CoroWrapper(target(*args, **kwargs), context)
        return context_coroutine
    return coro_wrapper

This allows you to directly decorate a coroutine function:

@send_context(PrintContext())
async def test_coro(delay=0.5):
    await asyncio.sleep(delay)
    return 2

print('async run returned:', asyncio.run(test_coro()))
# enter
# exit via None
# enter
# exit via <class 'StopIteration'>
# async run returned: 2
Luteal answered 10/5, 2019 at 14:43 Comment(16)
Excellent answer. A minor quibble: regular iterator is probably a bit optimistic. Although an iterator might work for simple cases, your actual example expects __await__ to return a generator (or an object mimicking it with duck typing), as it calls the generator methods send and throw, neither of which are available on an ordinary iterator. I think you should also call close() the target_iter once you're done, otherwise its finally clauses will not run or will run later than necessary.Asaasabi
@Asaasabi Interestingly enough, the specification for __await__ only requires an iterator. For example, def __await__(self): return iter([1, 2, 3, 4]) is valid and works. However, event loops (and the shown class) expect coroutines, not awaitables - a coroutine's __await__ implements the generator interface. There is no need to explicitly close target_iter, since the wrapper will go on until target_iter throws StopIteration - this only happens when the underlying iterator is done.Luteal
Agreed about closing target_iter and StopIteration. As for event loops expecting coroutines, I don't think that's true for asyncio. Future is an example of an awaitable that is not a coroutine, and is accepted everywhere. The only place that requires a coroutine is create_task, which is specifically designed to drive a coroutine.Asaasabi
@Asaasabi You are correct. Future is not a coroutine, though Future.__await__ provides a generator. It seems like the minimum expected functionality is a generator-interface-__await__.Luteal
Do you think an __await__ that is just an iterator wouldn't work in asyncio? I think it should work, but it will simply not support features like propagating cancellation, which requires .throw(exc).Asaasabi
@Asaasabi asyncio works by using send and throw. Whatever you feed to asyncio must implement the generator interface, at least for __await__. In principle, you can use an iterator-__await__ somewhere down the stack, but asyncio will choke when trying to communicate with it. Basically an iterator-__await__ is a one-way communication, but asyncio expects bidirectional. You can find my writeup on the implications of iterator-__await__ here: #49191025Luteal
Whatever you feed to asyncio must implement the generator interface, at least for await I don't think this is true. A simple test shows that you can pass an awaitable implemented with an iterator to asyncio. This is explicitly supported in ensure_future. You can also pass such awaitables to combinators like gather, all of which call ensure_future internally, so that works just fine too.Asaasabi
Basically an iterator-__await__ is a one-way communication, but asyncio expects bidirectional. Asyncio only sends None to the coroutine, which is why it works with an iterator just fine. The only case when something more is needed is when CancelledError is injected into the coroutine, and that is handled by _wrap_awaitable (whose yield from, I suppose, simply ignores the exception it can't pass to the iterator it was delegated to).Asaasabi
@Asaasabi Alright, "whatever you feed to asyncio's async interface should really emulate the coroutine interface if you want full functionality, but of course asyncio lugs around an incredible array of sync/legacy wrappers that are poorly separated into public/private interface and you can freely use those if you feel like it, especially if you do not communicate with the event loop at all". asyncio.run does an explicit type check for coroutines/generators, by the way.Luteal
Hmm. The only functionality I'm aware of that wouldn't work is cancellation. Is there something else I'm missing? Supporting non-generators in __await__ is extremely useful for things like Cython or Python/C implementations of awaitables, so I don't see it as a bad thing and don't really understand the sentiment underlying your comment. (Perhaps I'm misreading you, but you sound frustrated by this part of asyncio design.) I'm aware of asyncio.run() requiring a coroutine, and it's vaguely annoying because something as simple as asyncio.run(gather(a(), b())) doesn't work as a result.Asaasabi
@Asaasabi The full coroutine interface is basically send(None) and throw. Not supporting cancellation is not some small detail, it is half the interface; notably one that sets async apart from threads. Not picking a consistent communication interface means that all asyncio material, including answers, documentation, and code, is ripe with special casing of things that may-or-may-not-work. This is exemplified by basic async operations like gather being a mess of 70 lines that does not even fit a common async interface - compared to, say, a 4 line coroutine in trio.Luteal
You're making a good point, iterator-based __await__ fundamentally doesn't support cancellation, which is a bummer. Cancellation can still be supported with custom awaitables using duck typing. The yield from in _wrap_awaitable will automatically make use of the throw if the inner iterator provides it, thus allowing injection of CancelledError. This can be (and likely is) used by Cython, and then we're no longer talking about pure-iterator __await__, but about duck-typed-generator one, which is a different beast.Asaasabi
I think you can replace return err.args[0] if err.args else None with return err.value.Asaasabi
@Asaasabi Thanks for the hint! Tested and verified it with the docs, err.value is valid on all Python versions that support async.Luteal
No problem! That occurred to me while reviewing your code in preparation for the answer here. While the problems this answer solves occur rarely, when they are this is truly an elegant solution for what would otherwise be an almost insurmountable issue.Asaasabi
Another use of the technique from this answer.Asaasabi

© 2022 - 2024 — McMap. All rights reserved.