Yielding asyncio generator data back from event loop possible?
Asked Answered
P

3

7

I would like to read from multiple simultanous HTTP streaming requests inside coroutines using httpx, and yield the data back to my non-async function running the event loop, rather than just returning the final data.

But if I make my async functions yield instead of return, I get complaints that asyncio.as_completed() and loop.run_until_complete() expects a coroutine or a Future, not an async generator.

So the only way I can get this to work at all is by collecting all the streamed data inside each coroutine, returning all data once the request finishes. Then collect all the coroutine results and finally returning that to the non-async calling function.

Which means I have to keep everything in memory, and wait until the slowest request has completed before I get all my data, which defeats the whole point of streaming http requests.

Is there any way I can accomplish something like this? My current silly implementation looks like this:

def collect_data(urls):
    """Non-async function wishing it was a non-async generator"""

    async def stream(async_client, url, payload):
        data = []
        async with async_client.stream("GET", url=url) as ar:
            ar.raise_for_status()
            async for line in ar.aiter_lines():
                data.append(line)
                # would like to yield each line here
        return data

    async def execute_tasks(urls):
        all_data = []
        async with httpx.AsyncClient() as async_client:
            tasks = [stream(async_client, url) for url in urls]
            for coroutine in asyncio.as_completed(tasks):
                all_data += await coroutine
                # would like to iterate and yield each line here
        return all_events

    try:
        loop = asyncio.get_event_loop()
        data = loop.run_until_complete(execute_tasks(urls=urls))
        return data
        # would like to iterate and yield the data here as it becomes available
    finally:
        loop.close()

I've tried some solutions using asyncio.Queue and trio memory channels as well, but since I can only read from those in an async scope it doesn't get me any closer to a solution.

The reason I want to use this from a non-asyncronous generator is that I want to use it from a Django app using a Django Rest Framework streaming API.

Polygnotus answered 25/8, 2020 at 21:56 Comment(3)
I'm not sure I understand. Why can't you just call a variant of collect_data inside stream method? The way you are trying to do, i.e. call something after loop.run_until_complete forces you to store everything in memory. Which as you've correctly noted is against the idea of streaming. There's no way to avoid this, since loop.run_until_complete is not a generator. In other words, if you can't modify collect_data and call it inside stream method then there's not much you can do.Triggerhappy
Why does collect_data have to be sync? Normally using async code anywhere requires most of the program to be async.Vacate
Updated the question and mentioned I want to expose this in a Django DRF app running syncronously. It could very well be that the idea of doing this from non-async code is stupid and I need to rethink the whole thing :)Polygnotus
V
11

Normally you should just make collect_data async, and use async code throughout - that's how asyncio was designed to be used. But if that's for some reason not feasible, you can iterate an async iterator manually by applying some glue code:

def iter_over_async(ait, loop):
    ait = ait.__aiter__()
    # helper async fn that just gets the next element
    # from the async iterator
    async def get_next():
        try:
            obj = await ait.__anext__()
            return False, obj
        except StopAsyncIteration:
            return True, None
    # actual sync iterator (implemented using a generator)
    while True:
        done, obj = loop.run_until_complete(get_next())
        if done:
            break
        yield obj

The way the above works is by providing an async closure that keeps retrieving the values from the async iterator using the __anext__ magic method and returning the objects as they arrive. This async closure is invoked with run_until_complete() in a loop inside an ordinary sync generator. (The closure actually returns a pair of done indicator and actual object in order to avoid propagating StopAsyncIteration through run_until_complete, which might be unsupported.)

With this in place, you can make your execute_tasks an async generator (async def with yield) and iterate over it using:

for chunk in iter_over_async(execute_tasks(urls), loop):
    ...

Just note that this approach is incompatible with asyncio.run, and assumes the event loop can only be run "occasionally". This might cause issues in more complex async code which might want to set up some background tasks - but for simple code it should be fine.

Vacate answered 26/8, 2020 at 10:18 Comment(3)
This solution seems to work nicely, and thanks for the warning for when I move to python>=3.7. I also ended up using aiostream.stream.merge on the "tasks" to iterate over all the async generators "at the same time".Polygnotus
Nice use of aiostream. :) Note that your code will work fine with Python 3.7 and later, loop.run_until_complete() is not going anywhere. It's just that the general recommendations are shifting towards asyncio.run, so at some point your design might fall behind and you might want to rethink it.Vacate
This is brilliant and exactly what I needed. I adjusted the signature so the loop param is optional, with a default of asyncio.get_event_loop(). Then calling it is even more trivial.Martijn
E
3

Just wanting to update @user4815162342's solution to use asyncio.run_coroutine_threadsafe instead of loop.run_until_complete.

import asyncio
from typing import Any, AsyncGenerator

def _iter_over_async(loop: asyncio.AbstractEventLoop, async_generator: AsyncGenerator):
    ait = async_generator.__aiter__()

    async def get_next() -> tuple[bool, Any]:
        try:
            obj = await ait.__anext__()
            done = False

        except StopAsyncIteration:
            obj = None
            done = True

        return done, obj

    while True:
        done, obj = asyncio.run_coroutine_threadsafe(get_next(), loop).result()

        if done:
            break

        yield obj

I'd also like to add, that I have found tools like this quite helpful in the process of piecewise convert synchronous code to asyncio code.

Eolithic answered 10/8, 2022 at 2:7 Comment(0)
M
1

There is a nice library that does this (and more!) called pypeln:

import pypeln as pl
import asyncio
from random import random

async def slow_add1(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x + 1

async def slow_gt3(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x > 3

data = range(10) # [0, 1, 2, ..., 9] 

stage = pl.task.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.task.filter(slow_gt3, stage, workers=2)

data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]
Martijn answered 6/2, 2023 at 3:33 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.