How to speed up async requests in Python
Asked Answered
P

2

9

I want to download/scrape 50 million log records from a site. Instead of downloading 50 million in one go, I was trying to download it in parts like 10 million at a time using the following code but it's only handling 20,000 at a time (more than that throws an error) so it becomes time-consuming to download that much data. Currently, it takes 3-4 mins to download 20,000 records with the speed of 100%|██████████| 20000/20000 [03:48<00:00, 87.41it/s] so how to speed it up?

import asyncio
import aiohttp
import time
import tqdm
import nest_asyncio

nest_asyncio.apply()


async def make_numbers(numbers, _numbers):
    for i in range(numbers, _numbers):
        yield i


n = 0
q = 10000000


async def fetch():
    # example
    url = "https://httpbin.org/anything/log?id="

    async with aiohttp.ClientSession() as session:
        post_tasks = []
        # prepare the coroutines that poat
        async for x in make_numbers(n, q):
            post_tasks.append(do_get(session, url, x))
        # now execute them all at once

        responses = [await f for f in tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]


async def do_get(session, url, x):
    headers = {
        'Content-Type': "application/x-www-form-urlencoded",
        'Access-Control-Allow-Origin': "*",
        'Accept-Encoding': "gzip, deflate",
        'Accept-Language': "en-US"
    }

    async with session.get(url + str(x), headers=headers) as response:
        data = await response.text()
        print(data)


s = time.perf_counter()
try:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(fetch())
except:
    print("error")

elapsed = time.perf_counter() - s
# print(f"{__file__} executed in {elapsed:0.2f} seconds.")

Traceback (most recent call last):

File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 986, in _wrap_create_connection
    return await self._loop.create_connection(*args, **kwargs)  # type: ignore[return-value]  # noqa
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 1056, in create_connection
    raise exceptions[0]
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 1041, in create_connection
    sock = await self._connect_sock(
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 955, in _connect_sock
    await self.sock_connect(sock, address)
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\proactor_events.py", line 702, in sock_connect
    return await self._proactor.connect(sock, address)
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 328, in __wakeup
    future.result()
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\windows_events.py", line 812, in _poll
    value = callback(transferred, key, ov)
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\windows_events.py", line 599, in finish_connect
    ov.getresult()
OSError: [WinError 121] The semaphore timeout period has expired

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 136, in <module>
    loop.run_until_complete(fetch())
  File "C:\Users\SGM\AppData\Roaming\Python\Python39\site-packages\nest_asyncio.py", line 81, in run_until_complete
    return f.result()
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\futures.py", line 201, in result
    raise self._exception
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 256, in __step
    result = coro.send(None)
  File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 88, in fetch
    response = await f
  File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 37, in _wait_for_one
    return f.result()
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\futures.py", line 201, in result
    raise self._exception
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 258, in __step
    result = coro.throw(exc)
  File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 125, in do_get
    async with session.get(url + str(x), headers=headers) as response:
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\client.py", line 1138, in __aenter__
    self._resp = await self._coro
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\client.py", line 535, in _request
    conn = await self._connector.connect(
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 542, in connect
    proto = await self._create_connection(req, traces, timeout)
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 907, in _create_connection
    _, proto = await self._create_direct_connection(req, traces, timeout)
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 1206, in _create_direct_connection
    raise last_exc
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 1175, in _create_direct_connection
    transp, proto = await self._wrap_create_connection(
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 992, in _wrap_create_connection
    raise client_error(req.connection_key, exc) from exc
aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host example.com:80 ssl:default [The semaphore timeout period has expired]
Petrochemistry answered 23/2, 2022 at 7:33 Comment(0)
M
18

Bottleneck: number of simultaneous connections

First, the bottleneck is the total number of simultaneous connections in the TCP connector.

That default for aiohttp.TCPConnector is limit=100. On most systems (tested on macOS), you should be able to double that by passing a connector with limit=200:

# async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=200)) as session:

The time taken should decrease significantly. (On macOS: q = 20_000 decreased 43% from 58 seconds to 33 seconds, and q = 10_000 decreased 42% from 31 to 18 seconds.)

The limit you can configure depends on the number of file descriptors that your machine can open. (On macOS: You can run ulimit -n to check, and ulimit -n 1024 to increase to 1024 for the current terminal session, and then change to limit=1000. Compared to limit=100, q = 20_000 decreased 76% to 14 seconds, and q = 10_000 decreased 71% to 9 seconds.)

Supporting 50 million requests: async generators

Next, the reason why 50 million requests appears to hang is simply because of its sheer number.

Just creating 10 million coroutines in post_tasks takes 68-98 seconds (varies greatly on my machine), and then the event loop is further burdened with that many tasks, 99.99% of which are blocked by the TCP connection pool.

We can defer the creation of coroutines using an async generator:

async def make_async_gen(f, n, q):
    async for x in make_numbers(n, q):
        yield f(x)

We need a counterpart to asyncio.as_completed() to handle async_gen and concurrency:

from asyncio import ensure_future, events
from asyncio.queues import Queue

def as_completed_for_async_gen(fs_async_gen, concurrency):
    done = Queue()
    loop = events.get_event_loop()
    # todo = {ensure_future(f, loop=loop) for f in set(fs)}  # -
    todo = set()                                             # +

    def _on_completion(f):
        todo.remove(f)
        done.put_nowait(f)
        loop.create_task(_add_next())  # +

    async def _wait_for_one():
        f = await done.get()
        return f.result()

    async def _add_next():  # +
        try:
            f = await fs_async_gen.__anext__()
        except StopAsyncIteration:
            return
        f = ensure_future(f, loop=loop)
        f.add_done_callback(_on_completion)
        todo.add(f)

    # for f in todo:                           # -
    #     f.add_done_callback(_on_completion)  # -
    # for _ in range(len(todo)):               # -
    #     yield _wait_for_one()                # -
    for _ in range(concurrency):               # +
        loop.run_until_complete(_add_next())   # +
    while todo:                                # +
        yield _wait_for_one()                  # +

Then, we update fetch():

from functools import partial

CONCURRENCY = 200  # +

n = 0
q = 50_000_000

async def fetch():
    # example
    url = "https://httpbin.org/anything/log?id="

    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=CONCURRENCY)) as session:
        # post_tasks = []                                                # -
        # # prepare the coroutines that post                             # -
        # async for x in make_numbers(n, q):                             # -
        #     post_tasks.append(do_get(session, url, x))                 # -
        # Prepare the coroutines generator                               # +
        async_gen = make_async_gen(partial(do_get, session, url), n, q)  # +

        # now execute them all at once                                                                         # -
        # responses = [await f for f in tqdm.asyncio.tqdm.as_completed(post_tasks, total=len(post_tasks))]     # -
        # Now execute them with a specified concurrency                                                        # +
        responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)]  # +

Other limitations

With the above, the program can start processing 50 million requests but:

  1. it will still take 8 hours or so with CONCURRENCY = 1000, based on the estimate from tqdm.
  2. your program may run out of memory for responses and crash.

For point 2, you should probably do:

# responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)]
for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q):
    response = await f
    
    # Do something with response, such as writing to a local file
    # ...

An error in the code

do_get() should return data:

async def do_get(session, url, x):
    headers = {
        'Content-Type': "application/x-www-form-urlencoded",
        'Access-Control-Allow-Origin': "*",
        'Accept-Encoding': "gzip, deflate",
        'Accept-Language': "en-US"
    }

    async with session.get(url + str(x), headers=headers) as response:
        data = await response.text()
        # print(data)  # -
        return data    # +
Massif answered 27/2, 2022 at 13:54 Comment(0)
L
1

If it's not the bandwidth that limits you (but I cannot check this), there is a solution less complicated than the celery and rabbitmq but it is not as scalable as the celery and rabbitmq, it will be limited by your number of CPU.

Instead of splitting calls on celery workers, you split them on multiple processes.

I modified the fetch function like this:

async def fetch(start, end):
    # example
    url = "https://httpbin.org/anything/log?id="
    async with aiohttp.ClientSession() as session:
        post_tasks = []
        # prepare the coroutines that poat
        # use start and end arguments here!
        async for x in make_numbers(start, end):
            post_tasks.append(do_get(session, url, x))
        # now execute them all at once

        responses = [await f for f in
                     tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]

and I modified the main processes:

import concurrent.futures
from itertools import count

def one_executor(start, end):
    loop = asyncio.new_event_loop()
    try:
        loop.run_until_complete(fetch(start, end))
    except:
        print("error")


if __name__ == '__main__':

    s = time.perf_counter()
    # Change the value to the number of core you want to use.
    max_worker = 4
    length_by_executor = q // max_worker
    with concurrent.futures.ProcessPoolExecutor(max_workers=max_worker) as executor:
        for index_min in count(0, length_by_executor):
            # no matter with duplicated indexes due to the use of 
            # range in make_number function.
            index_max = min(index_min + length_by_executor, q)
            executor.submit(one_executor, index_min, index_max)
            if index_max == q:
                break

    elapsed = time.perf_counter() - s
    print(f"executed in {elapsed:0.2f} seconds.")

Here the result I get (with the value of q set to 10_000):

1 worker: executed in 13.90 seconds.
2 workers: executed in 7.24 seconds.
3 workers: executed in 6.82 seconds.

I don't work on the tqdm progress bar, with the current solution, two bars will be displayed (but I think tqdm works well with multi processes).

Leyla answered 26/2, 2022 at 15:51 Comment(5)
RuntimeError: Event loop is closedPetrochemistry
even when I try to get 100 records it gets stuck at the last few records and through the above errorPetrochemistry
I was working with Python 3.10 and I replace get_event_loop by new_event_loop in the one_executor function. Putting back get_event_loop should put everything like it was in your code.Leyla
I have no bandwidth limitation but still, your solution does not resolve my problem at it fetch records at the same speed...Petrochemistry
Did you change the value of max_worker = 1? It was set to one in the answer thus it is normal that the fetch stays at the same speed. I edit my answer.Leyla

© 2022 - 2024 — McMap. All rights reserved.