I would like to read from multiple simultanous HTTP streaming requests inside coroutines using httpx, and yield the data back to my non-async function running the event loop, rather than just returning the final data.
But if I make my async functions yield instead of return, I get complaints that asyncio.as_completed()
and loop.run_until_complete()
expects a coroutine or a Future, not an async generator.
So the only way I can get this to work at all is by collecting all the streamed data inside each coroutine, returning all data once the request finishes. Then collect all the coroutine results and finally returning that to the non-async calling function.
Which means I have to keep everything in memory, and wait until the slowest request has completed before I get all my data, which defeats the whole point of streaming http requests.
Is there any way I can accomplish something like this? My current silly implementation looks like this:
def collect_data(urls):
"""Non-async function wishing it was a non-async generator"""
async def stream(async_client, url, payload):
data = []
async with async_client.stream("GET", url=url) as ar:
ar.raise_for_status()
async for line in ar.aiter_lines():
data.append(line)
# would like to yield each line here
return data
async def execute_tasks(urls):
all_data = []
async with httpx.AsyncClient() as async_client:
tasks = [stream(async_client, url) for url in urls]
for coroutine in asyncio.as_completed(tasks):
all_data += await coroutine
# would like to iterate and yield each line here
return all_events
try:
loop = asyncio.get_event_loop()
data = loop.run_until_complete(execute_tasks(urls=urls))
return data
# would like to iterate and yield the data here as it becomes available
finally:
loop.close()
I've tried some solutions using asyncio.Queue
and trio
memory channels as well, but since I can only read from those in an async scope it doesn't get me any closer to a solution.
The reason I want to use this from a non-asyncronous generator is that I want to use it from a Django app using a Django Rest Framework streaming API.
collect_data
insidestream
method? The way you are trying to do, i.e. call something afterloop.run_until_complete
forces you to store everything in memory. Which as you've correctly noted is against the idea of streaming. There's no way to avoid this, sinceloop.run_until_complete
is not a generator. In other words, if you can't modifycollect_data
and call it insidestream
method then there's not much you can do. – Triggerhappycollect_data
have to be sync? Normally using async code anywhere requires most of the program to be async. – Vacate