How to limit concurrency with Python asyncio?
Asked Answered
T

10

143

Let's assume we have a bunch of links to download and each of the link may take a different amount of time to download. And I'm allowed to download using utmost 3 connections only. Now, I want to ensure that I do this efficiently using asyncio.

Here's what I'm trying to achieve: At any point in time, try to ensure that I have atleast 3 downloads running.

Connection 1: 1---------7---9---
Connection 2: 2---4----6-----
Connection 3: 3-----5---8-----

The numbers represent the download links, while hyphens represent Waiting for download.

Here is the code that I'm using right now

from random import randint
import asyncio

count = 0


async def download(code, permit_download, no_concurrent, downloading_event):
    global count
    downloading_event.set()
    wait_time = randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))
    count -= 1
    if count < no_concurrent and not permit_download.is_set():
        permit_download.set()


async def main(loop):
    global count
    permit_download = asyncio.Event()
    permit_download.set()
    downloading_event = asyncio.Event()
    no_concurrent = 3
    i = 0
    while i < 9:
        if permit_download.is_set():
            count += 1
            if count >= no_concurrent:
                permit_download.clear()
            loop.create_task(download(i, permit_download, no_concurrent, downloading_event))
            await downloading_event.wait()  # To force context to switch to download function
            downloading_event.clear()
            i += 1
        else:
            await permit_download.wait()
    await asyncio.sleep(9)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

And the output is as expected:

downloading 0 will take 2 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 2 second(s)
downloaded 0
downloading 4 will take 3 second(s)
downloaded 1
downloaded 3
downloading 5 will take 2 second(s)
downloading 6 will take 2 second(s)
downloaded 5
downloaded 6
downloaded 4
downloading 7 will take 1 second(s)
downloading 8 will take 1 second(s)
downloaded 7
downloaded 8

But here are my questions:

  1. At the moment, I'm simply waiting for 9 seconds to keep the main function running till the downloads are complete. Is there an efficient way of waiting for the last download to complete before exiting the main function? (I know there's asyncio.wait, but I'll need to store all the task references for it to work)

  2. What's a good library that does this kind of task? I know javascript has a lot of async libraries, but what about Python?

Edit: 2. What's a good library that takes care of common async patterns? (Something like async)

Terrilynterrine answered 28/1, 2018 at 5:8 Comment(1)
For your particular use case, use aiohttp, which already has a setting to limit the max number of connections. https://mcmap.net/q/161275/-aiohttp-set-maximum-number-of-requests-per-secondRatchet
O
70

Before reading the rest of this answer, please note that the idiomatic way of limiting the number of parallel tasks this with asyncio is using asyncio.Semaphore, as shown in Mikhail's answer and elegantly encapsulated in Andrei's answer. This answer contains working, but a bit more complicated ways of achieving the same. I am leaving the answer because in some cases this approach can have advantages over a semaphore, specifically when the amount of items to process is very large or unbounded, and you cannot create all the coroutines in advance. In that case the second (queue-based) solution in this answer is what you want. But in most everyday situations, such as parallel download through aiohttp, one should use a semaphore instead.


You basically need a fixed-size pool of download tasks. asyncio doesn't come with a pre-made task pool, but it is easy to create one: simply keep a set of tasks and don't allow it to grow past the limit. Although the question states your reluctance to go down that route, the code ends up much more elegant:

import asyncio, random

async def download(code):
    wait_time = random.randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))

async def main(loop):
    no_concurrent = 3
    dltasks = set()
    i = 0
    while i < 9:
        if len(dltasks) >= no_concurrent:
            # Wait for some download to finish before adding a new one
            _done, dltasks = await asyncio.wait(
                dltasks, return_when=asyncio.FIRST_COMPLETED)
        dltasks.add(loop.create_task(download(i)))
        i += 1
    # Wait for the remaining downloads to finish
    await asyncio.wait(dltasks)

An alternative is to create a fixed number of coroutines doing the downloading, much like a fixed-size thread pool, and feed them work using an asyncio.Queue. This removes the need to manually limit the number of downloads, which will be automatically limited by the number of coroutines invoking download():

# download() defined as above

async def download_worker(q):
    while True:
        code = await q.get()
        await download(code)
        q.task_done()

async def main(loop):
    q = asyncio.Queue()
    workers = [loop.create_task(download_worker(q)) for _ in range(3)]
    i = 0
    while i < 9:
        await q.put(i)
        i += 1
    await q.join()  # wait for all tasks to be processed
    for worker in workers:
        worker.cancel()
    await asyncio.gather(*workers, return_exceptions=True)

As for your other question, the obvious choice would be aiohttp.

Od answered 28/1, 2018 at 8:42 Comment(15)
The first approach works very well and I need not create and store all the task references in advance (I use a generator to lazily load the download links). I did not know asyncio.wait had a "return_when" parameter.Terrilynterrine
@Terrilynterrine In the second solution you only create the three coroutines for downloading in advance, the actual download links can also be generated lazily. But it's a matter of taste - I think I would also prefer the first solution in practice.Od
@OrangeDog That is actually intentional, because the OP's code was using manual while loops. The idea was to adapt their existing code (preserving the non-conventional idiom) to the desired semantics.Od
The Sempahore is deprecated since version 3.8 and will be removed in version 3.10. official warning reads. Instead they are asking to use loop. But how to use It can anyone provide any example.Schleiermacher
@Schleiermacher Since you don't provide code or the exact error message, it's hard to tell what you're referring to, but rest assured that asyncio.Semaphore is not deprecated. What is deprecated and will be removed is the loop parameter to its constructor, which you can omit and everything will work just fine. (This is not specific to semaphores, the loop parameter is being removed across the board.)Od
@Od sorry my bad. you're right. I got confused in doc.Schleiermacher
@18augst Would you discuss the edit in a comment? The changes you proposed should not be necessary.Od
second approach seems faster in theory but fails in practice. First approach beats second one by 2-3 times faster.Semanteme
@AhmetK I find such a difference very unlikely, and probably a result of a flaw in the implementation. It's hard to tell without access to the code used to benchmark both cases.Od
for me it was async request i used httpx and first one was making 10 request at the same time but second one seems like not doing it.Semanteme
@AhmetK You should accompany such claims with code (perhaps posting a separate question). It is most likely that your second code has a problem that prevented it from running in parallel.Od
I wouldn't say that using semaphores for this use case is the most idiomatic. We just happen to be hypnotized by the beauty of async with semaphore - but as long as you're in control of the loop that schedules tasks, introducing a shared state and creating at once all tasks that all wait on it is actually wasteful. I find the first part of this answer the most effective ( yet with a preference to AioPool for its brevity ). The queue is nice, but it's also an unnecessary shared resource, in that specific case of being given an iterable of coroutines to execute.Allsun
@Allsun It's idiomatic in the sense of it being an idiom that is widely used, universally recognized, and frequently recommended. You can argue that it's not the optimal solution for all circumstances, but that's why there are different answers with different approaches.Od
no, it's not, it's even worse to state it that way - looking familiar doesn't imply it's correct. What I mean here, for that problem statement, recommending semaphores because somehow it's generally used to solve the more general problem of limiting access to a shared resource, is not a good adviceAllsun
@Allsun We can agree to disagree about the use of the term idiomatic, I don't care to argue that point. As for whether the approach is correct, it depends on what you're doing. As long as the number of tasks is bounded, there should be no problem in creating them in advance. Asyncio tasks are lightweight, and being able to create many of them was one of the motivators for providing the library.Od
D
203

If I'm not mistaken you're searching for asyncio.Semaphore. Example of usage:

import asyncio
from random import randint


async def download(code):
    wait_time = randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))


sem = asyncio.Semaphore(3)


async def safe_download(i):
    async with sem:  # semaphore limits num of simultaneous downloads
        return await download(i)


async def main():
    tasks = [
        asyncio.ensure_future(safe_download(i))  # creating task starts coroutine
        for i
        in range(9)
    ]
    await asyncio.gather(*tasks)  # await moment all downloads done


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

Output:

downloading 0 will take 3 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 3 second(s)
downloaded 1
downloaded 0
downloading 4 will take 2 second(s)
downloading 5 will take 1 second(s)
downloaded 5
downloaded 3
downloading 6 will take 3 second(s)
downloading 7 will take 1 second(s)
downloaded 4
downloading 8 will take 2 second(s)
downloaded 7
downloaded 8
downloaded 6

An example of async downloading with aiohttp can be found here. Note that aiohttp has a Semaphore equivalent built in, which you can see an example of here. It has a default limit of 100 connections.

Decerebrate answered 28/1, 2018 at 12:52 Comment(9)
Is there a good Python async library to deal with common async programming patterns? Like the famous async package for JavaScript.Terrilynterrine
@Terrilynterrine from my experience asyncio itself contains all you usually need. Take a look at synchronization primitives and at module's functions in general.Decerebrate
@MikhailGerasimov calling asyncio.ensure_future() is redundant as async.gather() calls it internally anyway (source). However then calling the variable tasks would be "wrong", because these are not tasks yet.Cry
Does asyncio.Semaphore(3) mean you end up with 3 requests per second? Or is it something different?Juxon
@politicalscientist it means that not more than 3 requests can be active simultaneously at any given point of time.Decerebrate
Unfortunately this approach leaves an unbounded number of tasks started on the current loop. It would be nice to only start them as needed.Spirillum
THIS WILL STOP SCALING AT 1k-10k TASKS. This adds all the tasks the event loop at the beginning, so the event loop will spend most of its time in the round robin scheduler trying to find the next task to run, instead of actually running tasks! What you want to do is limit the number of tasks in the event loop, like in this answer: https://mcmap.net/q/158893/-how-to-limit-concurrency-with-python-asyncioChadbourne
Thanks for this answer. Why doesn't it work with asyncio.run(main())? Error: RuntimeError: Task <Task pending coro=<task.<locals>._decorate.<locals>.wrapper.<locals>._inner() running at /home/hadoop/.local/lib/python3.7/site-packages/aiodag/task_decorator.py:71> cb=[gather.<locals>._done_callback() at /usr/lib64/python3.7/asyncio/tasks.py:691]> got Future <Future pending> attached to a different loopBabysitter
@Babysitter hm, works on Python 3.10 for me. I don't know if 3.7 handles it differently, but you can try moving creating of anything asyncio-related inside main() function to be sure every task is created while the correct event loop is running.Decerebrate
P
159

I used Mikhail Gerasimov's answer and ended up with this little gem

async def gather_with_concurrency(n, *coros):
    semaphore = asyncio.Semaphore(n)

    async def sem_coro(coro):
        async with semaphore:
            return await coro
    return await asyncio.gather(*(sem_coro(c) for c in coros))

Which you would run instead of normal gather

await gather_with_concurrency(100, *my_coroutines)
Perform answered 28/4, 2020 at 10:57 Comment(7)
Seeing a function within a function, my mind immediately went to decorators. I had a little play and you can implement this with decorators, either with a fixed semaphore value or dynamic; however, the solution here offers far more flexibility.Orbit
for me to work I had to modify "return await task" for "return await asyncio.create_task(task)" and pass a list of coroutines as tasks.Dicot
@Andrei what could be the Semaphore number that I can give for processing 30k http requests for a min? Is there any hard and fast rule?Pessimist
The tasks parameter of gather_with_concurrency is a bit misleading, it implies that you can use the function with several Tasks created with asyncio.create_task. However in that case it doesn't work, as create_task is actually executing the coroutine right away in the event loop. As gather_with_concurrency is expecting coroutines, the parameter should rather be named coros.Heinrike
It would be helpful to see a version of this that works with tasks as well as coroutines.Waylonwayman
I think "task" was confusing so I've renamed everything to "coro". When you create a task it gets started right away so it's actually a future. I don't believe you want to use this function for futures.Perform
@Waylonwayman This approach can't work on tasks by design. The whole idea is that the coroutine you're invoking has no idea that this is happening, and that the waiting is handled by gather_with_concurrency. This is possible with coroutines, which are by definition not running until you submit them to the event loop (i.e. create a task out of them). If you already have a task, it means that the coroutine has already started running, and your async with will be useless. You could of course add the async with to the task itself, but then you don't need gather_with_concurrency to begin with.Od
O
70

Before reading the rest of this answer, please note that the idiomatic way of limiting the number of parallel tasks this with asyncio is using asyncio.Semaphore, as shown in Mikhail's answer and elegantly encapsulated in Andrei's answer. This answer contains working, but a bit more complicated ways of achieving the same. I am leaving the answer because in some cases this approach can have advantages over a semaphore, specifically when the amount of items to process is very large or unbounded, and you cannot create all the coroutines in advance. In that case the second (queue-based) solution in this answer is what you want. But in most everyday situations, such as parallel download through aiohttp, one should use a semaphore instead.


You basically need a fixed-size pool of download tasks. asyncio doesn't come with a pre-made task pool, but it is easy to create one: simply keep a set of tasks and don't allow it to grow past the limit. Although the question states your reluctance to go down that route, the code ends up much more elegant:

import asyncio, random

async def download(code):
    wait_time = random.randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))

async def main(loop):
    no_concurrent = 3
    dltasks = set()
    i = 0
    while i < 9:
        if len(dltasks) >= no_concurrent:
            # Wait for some download to finish before adding a new one
            _done, dltasks = await asyncio.wait(
                dltasks, return_when=asyncio.FIRST_COMPLETED)
        dltasks.add(loop.create_task(download(i)))
        i += 1
    # Wait for the remaining downloads to finish
    await asyncio.wait(dltasks)

An alternative is to create a fixed number of coroutines doing the downloading, much like a fixed-size thread pool, and feed them work using an asyncio.Queue. This removes the need to manually limit the number of downloads, which will be automatically limited by the number of coroutines invoking download():

# download() defined as above

async def download_worker(q):
    while True:
        code = await q.get()
        await download(code)
        q.task_done()

async def main(loop):
    q = asyncio.Queue()
    workers = [loop.create_task(download_worker(q)) for _ in range(3)]
    i = 0
    while i < 9:
        await q.put(i)
        i += 1
    await q.join()  # wait for all tasks to be processed
    for worker in workers:
        worker.cancel()
    await asyncio.gather(*workers, return_exceptions=True)

As for your other question, the obvious choice would be aiohttp.

Od answered 28/1, 2018 at 8:42 Comment(15)
The first approach works very well and I need not create and store all the task references in advance (I use a generator to lazily load the download links). I did not know asyncio.wait had a "return_when" parameter.Terrilynterrine
@Terrilynterrine In the second solution you only create the three coroutines for downloading in advance, the actual download links can also be generated lazily. But it's a matter of taste - I think I would also prefer the first solution in practice.Od
@OrangeDog That is actually intentional, because the OP's code was using manual while loops. The idea was to adapt their existing code (preserving the non-conventional idiom) to the desired semantics.Od
The Sempahore is deprecated since version 3.8 and will be removed in version 3.10. official warning reads. Instead they are asking to use loop. But how to use It can anyone provide any example.Schleiermacher
@Schleiermacher Since you don't provide code or the exact error message, it's hard to tell what you're referring to, but rest assured that asyncio.Semaphore is not deprecated. What is deprecated and will be removed is the loop parameter to its constructor, which you can omit and everything will work just fine. (This is not specific to semaphores, the loop parameter is being removed across the board.)Od
@Od sorry my bad. you're right. I got confused in doc.Schleiermacher
@18augst Would you discuss the edit in a comment? The changes you proposed should not be necessary.Od
second approach seems faster in theory but fails in practice. First approach beats second one by 2-3 times faster.Semanteme
@AhmetK I find such a difference very unlikely, and probably a result of a flaw in the implementation. It's hard to tell without access to the code used to benchmark both cases.Od
for me it was async request i used httpx and first one was making 10 request at the same time but second one seems like not doing it.Semanteme
@AhmetK You should accompany such claims with code (perhaps posting a separate question). It is most likely that your second code has a problem that prevented it from running in parallel.Od
I wouldn't say that using semaphores for this use case is the most idiomatic. We just happen to be hypnotized by the beauty of async with semaphore - but as long as you're in control of the loop that schedules tasks, introducing a shared state and creating at once all tasks that all wait on it is actually wasteful. I find the first part of this answer the most effective ( yet with a preference to AioPool for its brevity ). The queue is nice, but it's also an unnecessary shared resource, in that specific case of being given an iterable of coroutines to execute.Allsun
@Allsun It's idiomatic in the sense of it being an idiom that is widely used, universally recognized, and frequently recommended. You can argue that it's not the optimal solution for all circumstances, but that's why there are different answers with different approaches.Od
no, it's not, it's even worse to state it that way - looking familiar doesn't imply it's correct. What I mean here, for that problem statement, recommending semaphores because somehow it's generally used to solve the more general problem of limiting access to a shared resource, is not a good adviceAllsun
@Allsun We can agree to disagree about the use of the term idiomatic, I don't care to argue that point. As for whether the approach is correct, it depends on what you're doing. As long as the number of tasks is bounded, there should be no problem in creating them in advance. Asyncio tasks are lightweight, and being able to create many of them was one of the motivators for providing the library.Od
D
20

The asyncio-pool library does exactly what you need.

https://pypi.org/project/asyncio-pool/

from asyncio_pool import AioPool

LIST_OF_URLS = ("http://www.google.com", "......")

pool = AioPool(size=3)
await pool.map(your_download_coroutine, LIST_OF_URLS)
Deeplaid answered 6/8, 2019 at 18:19 Comment(0)
R
12

If you have a generator producing your tasks, there may be more tasks than you can fit in memory simultaneously.

The classic asyncio.Semaphore context-manager pattern races all tasks into memory simultaneously.

I don't like the asyncio.Queue pattern. You can prevent it preloading all the tasks into memory (by setting maxsize=1), but it still requires boilerplate to define, start up and shut down the worker coroutines (which consume from the que), and you have to ensure a worker won't fail if a task throws an exception. It feels unpythonic, as if implementing your own multiprocessing.pool.

Instead, here is an alternative:

sem = asyncio.Semaphore(n := 5) # specify maximum concurrency

async def task_wrapper(args):
    try:
        await my_task(*args)
    finally:
        sem.release()

for args in my_generator: # may yield too many to list
    await sem.acquire() 
    asyncio.create_task(task_wrapper(args))

# wait for all tasks to complete
for i in range(n):
    await sem.acquire()

This pauses the generator when there are enough active tasks, and lets the event loop clean up finished tasks. Note, for older python versions, replace create_task with ensure_future.

Ratchet answered 23/6, 2021 at 15:42 Comment(1)
I like the brevity of this approach (at least compared to queue-based code). The downside is that it's a bit hard to follow - I had to read it several times to understand what it's doing and convince myself that it's not flawed.Od
T
9

Small Update: It's no longer necessary to create the loop. I tweaked the code below. Just cleans things up slightly.

# download(code) is the same

async def main():
    no_concurrent = 3
    dltasks = set()
    for i in range(9):
        if len(dltasks) >= no_concurrent:
            # Wait for some download to finish before adding a new one
            _done, dltasks = await asyncio.wait(dltasks, return_when=asyncio.FIRST_COMPLETED)
        dltasks.add(asyncio.create_task(download(i)))
    # Wait for the remaining downloads to finish
    await asyncio.wait(dltasks)

if __name__ == '__main__':
    asyncio.run(main())
Tragedian answered 14/11, 2019 at 16:52 Comment(2)
Wow this is what I'm looking for. Thank you very much for this. Is this the latest method? Because I've been using like tasks.append(task) asyncio.gather(*tasks) I just knew the asyncio.waitDion
I really like this approach, simple and intuitive, easy to unpack done tasks etc. as well.Ironware
A
9

Using semaphore, you can also create a decorator to wrap the function

import asyncio
from functools import wraps
def request_concurrency_limit_decorator(limit=3):
    # Bind the default event loop 
    sem = asyncio.Semaphore(limit)

    def executor(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            async with sem:
                return await func(*args, **kwargs)

        return wrapper

    return executor

Then, add the decorator to the origin download function.

@request_concurrency_limit_decorator(limit=...)
async def download(...):
    ...

Now you can call the download function like before, but with Semaphore to limit the concurrency.

await download(...)

It should be noted that when the decorator function is executed, the created Semaphore is bound to the default event loop, so you cannot call asyncio.run to create a new loop. Instead, call asyncio.get_event_loop().run... to use the default event loop.

asyncio.Semaphore RuntimeError: Task got Future attached to a different loop

Ardine answered 20/2, 2021 at 9:18 Comment(0)
B
3

The accepted answer creates a task for every piece of work right away. As has been pointed out, this means creating a lot of tasks simultaneously, even if the semaphore causes the actual work to be done with less parallelism. It also doesn't fare well in situations where pieces of work are read by different processes, where one wants each process to only bite off as many items as it can currently chew.

In a situation where there's a source of work to be done by these tasks, a better approach is to block on reading from that source until there's free task slots available. This is achieved by the following code:

import asyncio


class TaskQueue:
    def __init__(self, maxsize):
        self.maxsize = maxsize
        self.sem = asyncio.BoundedSemaphore(maxsize)
        self.tasks = set()

    def __len__(self):
        return len(self.tasks)

    def _task_done(self, task):
        self.sem.release()
        self.tasks.discard(task)

    async def put(self, coroutine):
        await self.sem.acquire()
        task = asyncio.create_task(coroutine)
        self.tasks.add(task)
        task.add_done_callback(self._task_done)

    async def pending(self):
        pending = self.tasks
        while pending:
            done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
            for task in done:
                yield len(self)

The pending() method is a bonus for when one wants to track the progress of currently pending tasks. Example usage:

tasks = TaskQueue(MAX_CONCURRENT_TASKS)
async for work_item in work_iter():  # E.g., a database or a queue.
    await tasks.put(do_work(work_item))

print(f"All tasks enqueued. Currently {len(tasks)} task(s) pending.")
async for i in tasks.pending():
    print("Number of pending tasks:", i)

One could also add a join() method that just does await asyncio.gather(*self.tasks).

Bidet answered 11/7, 2023 at 10:44 Comment(1)
I really like this answer. Using the TaskQueue is very easy and it does exactly what I was looking for. In my case, reading from an input queue that never ends, allows me to process X amount of items in parallel (async), consuming another item from the input queue for each processed item. When the input queue is empty, the code will just wait for more items and process those as well.Hercegovina
F
0

When using FastAPI on Windows, we can be limited by the number of concurrent connections, as the default is 64 (defined by the var FD_SETSIZE).

More info at https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-select?redirectedfrom=MSDN

Despite defining ProactorEventLoop (that uses IOCP), on Python versions, prior to 3.7, the select() routines are used, leading to exceptions.

One alternative is to use Andrei's answer to limit the number of concurrent connections in an ML/DL context. Using asyncio + hypercorn + FastAPI, the code is the following:

from hypercorn.config import Config
from hypercorn.asyncio import serve
from fastapi import FastAPI
import asyncio
import json
import time
import sys

app = FastAPI()
conn_limit = 10

async def gather_with_concurrency(n, *coros):
    """
    From Andrei's answer
    """
    semaphore = asyncio.Semaphore(n)

    async def sem_coro(coro):
        async with semaphore:
            return await coro
    return await asyncio.gather(*(sem_coro(c) for c in coros))


@app.get('/app/test')
def req_test():
    time.sleep(1)
    return {"test": "ok"}


if __name__ == "__main__":
    # Start the loop
    config = Config()
    config.bind = [f"0.0.0.0:12000"]
    config.workers = 1
    if sys.platform == 'win32':
        logger.info("Setting proactor event loop for Windows platform.")
        loop = asyncio.ProactorEventLoop()
        asyncio.set_event_loop(loop)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(gather_with_concurrency(conn_limit, serve(app, config)))
    loop.close()

Obs: This script was tested on Python 3.7.16 against 1000 workers on Apache JMeter.

Fishhook answered 24/2, 2023 at 18:50 Comment(0)
B
0

I like using aiometer for limiting concurrency, so maybe this helps someone stumbling across the post. Aiometer also has good documentation on github and a Wiki page. Here's a possible solution to original question, although I have provided it without an in-depth understanding of the asyncio.Event() etc. - which I am jet to learn - hence the warning at the end.

from random import randint
import asyncio
import aiometer
import functools


async def download(code, some_other_arg=None):
    wait_time = randint(1, 5)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))
    return randint(1, 100) #return some random data

tasks = [download(i) for i in range(9)]


async def main():
    async with aiometer.amap(
        functools.partial(download, some_other_arg='foo'),
        tasks,
        max_at_once=3, # Limit maximum number of concurrently running tasks.
        max_per_second=100,  # Limit request rate to not overload the server.
    ) as results:
        async for data in results:
            print("Data returned: ", data)

if __name__ == '__main__':
    asyncio.run(main())

This does however produce a RuntimeWarning which I couldn't yet figure out and in a bigger project may be a dealbreaker:

sys:1: RuntimeWarning: coroutine 'download' was never awaited

If someone knows how to fix above error in this, it will help, but it still seems to run fine nonetheless. I tried this substitution in the task creation part, but it didn't run at all, with the same error:

tasks = [asyncio.create_task(download(i)) for i in range(9)]
Borisborja answered 7/7, 2023 at 9:15 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.