Writing web responses to file in an asyncronous program
Asked Answered
Y

3

6

Working on replacing my implementation of a server query tool that uses ThreadPoolExecutors with all asynchronous calls using asyncio and aiohttp. Most of the transition is straight forward since network calls are non-blocking IO, it's the saving of the responses that has me in a conundrum.

All the examples I am using, even the docs for both libraries, use asyncio.gather() which collects all the awaitable results. In my case, these results can be files in the many GB range, and I don't want to store them in memory.

Whats an appropriate way to solve this? Is it to use asyncio.as_completed() and then:

for f in as_completed(aws):
    earliest_result = await f
    # Assumes `loop` defined under `if __name__` block outside coroutine
    loop = get_event_loop()
    # Run the blocking IO in an exectuor and write to file
    _ = await loop.run_in_executor(None, save_result, earliest_result)

Doesn't this introduce a thread (assuming I use a ThreadPoolExecutor by default) thus making this an asynchronous, multi-threaded program vice an asynchronous, single-threaded program?

Futher, does this ensure only 1 earliest_result is being written to file at any time? I dont want the call to await loop.run_in_executor(...) to be running, then another result comes in and I try to run to the same file; I could limit with a semaphore I suppose.

Yarn answered 8/3, 2019 at 0:23 Comment(0)
H
5

I'd suggest to make use of aiohttp Streaming API. Write your responses directly to the disk instead of RAM and return file names instead of responses itself from gather. Doing so won't use a lot of memory at all. This is a small demo of what I mean:

import asyncio

import aiofiles
from aiohttp import ClientSession


async def make_request(session, url):
    response = await session.request(method="GET", url=url)
    filename = url.split('/')[-1]
    async for data in response.content.iter_chunked(1024):
        async with aiofiles.open(filename, "ba") as f:
            await f.write(data)
    return filename


async def main():
    urls = ['https://github.com/Tinche/aiofiles',
            'https://github.com/aio-libs/aiohttp']
    async with ClientSession() as session:
        coros = [make_request(session, url) for url in urls]
        result_files = await asyncio.gather(*coros)
    print(result_files)


asyncio.run(main())
Homogeneous answered 23/5, 2019 at 13:54 Comment(0)
A
4

Very clever way of using the asyncio.gather method by @merrydeath. I tweaked the helper function like below and got a big performance boost:

    response = await session.get(url)
    filename = url.split('/')[-1]
    async with aiofiles.open(filename, "ba") as f:
        await f.write(response.read())

Results may differ depending on the download connection speed.

Ansermet answered 22/8, 2022 at 14:47 Comment(0)
A
0

In my case, these results can be files in the many GB range, and I don't want to store them in memory.

If I'm correct and in your code single aws means a downloading of a single file, you may face a following problem: while as_completed allows to swap data from RAM to HDD asap, all your aws running parallely storing each their data (buffer with partly downloaded file) in RAM simultaneously.

To avoid this you'll need to use semaphore to ensure not to much files are downloading parallely in the first place thus to prevent RAM overuse.

Here's example of using semaphore.

Doesn't this introduce a thread (assuming I use a ThreadPoolExecutor by default) thus making this an asynchronous, multi-threaded program vice an asynchronous, single-threaded program?

I'm not sure, I understand your question, but yes, your code will use threads, but only save_result will be executed inside those threads. All other code still runs in single main thread. Nothing bad here.

Futher, does this ensure only 1 earliest_result is being written to file at any time?

Yes, it is[*]. To be precisely keyword await at last line of your snippet will ensure it:

_ = await loop.run_in_executor(None, save_result, earliest_result)

You can read it as: "Start executing run_in_executor asynchronously and suspend execution flow at this line until run_in_executor is done and returned result".


[*] Yes, if you don't run multiple for f in as_completed(aws) loops parallely in the first place.

Arlettaarlette answered 8/3, 2019 at 14:54 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.