asyncio as_yielded from async generators
Asked Answered
T

1

12

I'm looking to be able to yield from a number of async coroutines. Asyncio's as_completed is kind of close to what I'm looking for (i.e. I want any of the coroutines to be able to yield at any time back to the caller and then continue), but that only seems to allow regular coroutines with a single return.

Here's what I have so far:

import asyncio


async def test(id_):
    print(f'{id_} sleeping')
    await asyncio.sleep(id_)
    return id_


async def test_gen(id_):
    count = 0
    while True:
        print(f'{id_} sleeping')
        await asyncio.sleep(id_)
        yield id_
        count += 1
        if count > 5:
            return


async def main():
    runs = [test(i) for i in range(3)]

    for i in asyncio.as_completed(runs):
        i = await i
        print(f'{i} yielded')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

Replacing runs = [test(i) for i in range(3)] with runs = [test_gen(i) for i in range(3)] and for for i in asyncio.as_completed(runs) to iterate on each yield is what I'm after.

Is this possible to express in Python and are there any third party maybe that give you more options then the standard library for coroutine process flow?

Thanks

Triaxial answered 7/6, 2017 at 7:8 Comment(0)
H
11

You can use aiostream.stream.merge:

from aiostream import stream

async def main():
    runs = [test_gen(i) for i in range(3)]
    async for x in stream.merge(*runs):
        print(f'{x} yielded')

Run it in a safe context to make sure the generators are cleaned up properly after the iteration:

async def main():
    runs = [test_gen(i) for i in range(3)]
    merged = stream.merge(*runs)
    async with merged.stream() as streamer:
        async for x in streamer:
            print(f'{x} yielded')

Or make it more compact using pipes:

from aiostream import stream, pipe

async def main():
    runs = [test_gen(i) for i in range(3)]
    await (stream.merge(*runs) | pipe.print('{} yielded'))

More examples in the documentation.


Adressing @nirvana-msu comment

It is possible to identify the generator that yielded a given value by preparing sources accordingly:

async def main():
    runs = [test_gen(i) for i in range(3)]
    sources = [stream.map(xs, lambda x: (i, x)) for i, xs in enumerate(runs)]
    async for i, x in stream.merge(*sources):
        print(f'ID {i}: {x}')
Haber answered 7/6, 2017 at 12:55 Comment(7)
this looks spot on, thank you. I'll give it a try later this evening and award you the answer.Triaxial
@Vincent: This is exactly what I was looking for! I considered using aioreactive as well, aiostream's source code is much more compact, and seems a lot easier to use and understand . Very good documentation as well! Thanks a lot :)Exanimate
Is there a way to know which generator the current value was yielded from?Pachston
One more question, if I may. Not sure if aiostream can help here, but how do I solve the same question in a simpler case - when I just have an iterable of coroutines (and not an iterable of asyc generators). I.e., just as in OP's example, when I do for i in asyncio.as_completed(runs) - how can I find out which coroutine the value was yielded from? Not sure if stream.map is relevant as we're not dealing with async generator here. I can manually wrap my asyc function in another one, but I'm wondering if there's a helper similar to stream.map.Pachston
If you want to avoid warnings such as AsyncIteratorContext is iterated outside of its context, you must also protect the async for inside an async with aiostream.stream.merge(*sources).stream() as stream. See github.com/vxgmichel/aiostream/issues/46 and github.com/dabeaz/curio/issues/176 for context.Macerate
This helped. Thanks! Just a quick one - In my observation, I saw aiostream.stream.merge is much faster than merged.stream(). Are there any considerable practical downsides of using aiostream.stream.merge despite the userwarning? UserWarning: Streamer is iterated outside of its context async for e in stream.merge(*tasks)Mallorca
@RewanthTammana The main downside of not using the async context is that you might still have tasks running in the background when leaving the iteration. The async context ensures that all the started tasks are either finished or cancelled. I would not expect a huge performance difference between the two approaches, outside of obvious cases like leaving the for-loop before all items are produced. Feel free to report an issue on the github repository with the timing you measured.Haber

© 2022 - 2025 — McMap. All rights reserved.