Using a semaphore with asyncio in Python
Asked Answered
E

2

10

I am trying to limit the number of simultaneous async functions running using a semaphore, but I cannot get it to work. My code boils down to this:

import asyncio


async def send(i):

    print(f"starting {i}")
    await asyncio.sleep(4)
    print(f"ending {i}")


async def helper():
    async with asyncio.Semaphore(value=5):
        await asyncio.gather(*[
            send(1),
            send(2),
            send(3),
            send(4),
            send(5),
            send(6),
            send(7),
            send(8),
            send(9),
            send(10),
        ])


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(helper())
    loop.close()

The output is:

starting 1
starting 2
starting 3
starting 4
starting 5
starting 6
starting 7
starting 8
starting 9
starting 10
ending 1
ending 2
ending 3
ending 4
ending 5
ending 6
ending 7
ending 8
ending 9
ending 10

I hope and expect that only 5 will run at time, however all 10 start and stop at the same time. What am I doing wrong?

Essex answered 20/3, 2021 at 18:2 Comment(1)
Only one acquire call is made on entering the context manager. Rather than using gather, you will need to loop through the requests making them one at a time with their on context manager each I think.Durango
J
15

Please find the working example below, feel free to ask questions:

import asyncio


async def send(i: int, semaphore: asyncio.Semaphore):
    # to demonstrate that all tasks start nearly together
    print(f"Hello: {i}")
    # only two tasks can run code inside the block below simultaneously
    async with semaphore:
        print(f"starting {i}")
        await asyncio.sleep(4)
        print(f"ending {i}")


async def async_main():
    s = asyncio.Semaphore(value=2)
    await asyncio.gather(*[send(i, semaphore=s) for i in range(1, 11)])


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_main())
    loop.close()

VERSION FROM 18.08.2023:

I see that many people are interested in how to use asyncio.Semaphore and I decided to extend my answer.

The new version illustrates how to use procuder-consumers pattern with asyncio.Semaphore. If you want something very simple, you are fine to use code from the original answer above. If you want more robust solution, which allows to limit number of asyncio.Tasks to work with, you can use this more robust solution.

import asyncio
from typing import List

CONSUMERS_NUMBER = 10  # workers/consumer number
TASKS_NUMBER = 20  # number of tasks to do


async def producer(tasks_to_do: List[int], q: asyncio.Queue) -> None:
    print(f"Producer started working!")
    for task in tasks_to_do:
        await q.put(task)  # put tasks to Queue

    # poison pill technique
    for _ in range(CONSUMERS_NUMBER):
        await q.put(None)  # put poison pill to all worker/consumers

    print("Producer finished working!")


async def consumer(
        consumer_name: str,
        q: asyncio.Queue,
        semaphore: asyncio.Semaphore,
) -> None:
    print(f"{consumer_name} started working!")
    while True:
        task = await q.get()

        if task is None:  # stop if poison pill was received
            break

        print(f"{consumer_name} took {task} from queue!")

        # number of tasks which could be processed simultaneously
        # is limited by semaphore
        async with semaphore:
            print(f"{consumer_name} started working with {task}!")
            await asyncio.sleep(4)
            print(f"{consumer_name} finished working with {task}!")

    print(f"{consumer_name} finished working!")


async def async_main() -> None:
    """Main entrypoint of async app."""
    tasks = [f"TheTask#{i + 1}" for i in range(TASKS_NUMBER)]
    q = asyncio.Queue(maxsize=2)
    s = asyncio.Semaphore(value=2)
    consumers = [
        consumer(
            consumer_name=f"Consumer#{i + 1}",
            q=q,
            semaphore=s,
        ) for i in range(CONSUMERS_NUMBER)
    ]
    await asyncio.gather(producer(tasks_to_do=tasks, q=q), *consumers)


if __name__ == "__main__":
    asyncio.run(async_main())


Jerkin answered 20/3, 2021 at 18:13 Comment(0)
P
3

I don't like the accepted answer (or at-least the current version of it). It doesn't "limit the number of simultaneous async functions running" that the question asks for. Consequently it doesn't scale - imagine a producer with millions of records instead of just 10 in the example used. Pretty sure you don't want a million entries in the event loop.

Instead we should limit the creation of the task itself, not limit the work done by the task once it's already created. Here's an example code that would work:

import asyncio
import sys
import typing
from collections.abc import Awaitable, Iterable


ConsumerId: typing.TypeAlias = int

tasks: dict[ConsumerId, Awaitable] = {}
sem = asyncio.Semaphore(4)

async def consumer(data, consumer_id: ConsumerId):
    try:
        print(f'Starting consumer {consumer_id}')
        print(f'Working on "{data}" ...')
        await asyncio.sleep(2)
        print(f'Exiting consumer {consumer_id}')
    finally:
        sem.release()
        # If we exited cleanly, just remove ourselves from `tasks` dict because nobody needs to await us.
        if not sys.exc_info()[0]:
            del tasks[consumer_id]

async def main(producer: Iterable):
    for consumer_id, data in enumerate(producer):
        await sem.acquire()
        consumer_task = asyncio.create_task(consumer(data, consumer_id = consumer_id))
        tasks[consumer_id] = consumer_task

    # At this point just await all the remaining tasks which are either still running or had exited with exception
    for coro in asyncio.as_completed(tasks.values()):
        try:
            await coro
        except Exception as exc:
            print(f'Do something about {exc}')

producer = range(100)
asyncio.run(main(producer = producer))

If you don't await a task that exited with an exception you get:

Task exception was never retrieved
future: <Task finished name='Task-2' coro=<consumer() done, defined at [...]

If you don't care about handling exceptions or want to handle the exception fully in the consumer itself (eg. just log it an move on) etc. then just del tasks[consumer_id] in finally unconditionally and that should keep the footprint low if a lot of consumers are prone to exit with an exception.

Proleg answered 2/7, 2023 at 21:4 Comment(2)
Hi, thanks for feedback to my answer, I improved it based on your suggests. I also ran you code, on Windows machine it gives error: 'RuntimeError: Task <Task pending name='Task-1' coro=<main() running at C:/Users/akozyrev/PycharmProjects/bgate-ng/88899.py:24> cb=[_run_until_complete_cb() at C:\Users\akozyrev\AppData\Local\Programs\Python\Python38\lib\asyncio\base_events.py:184]> got Future <Future pending> attached to a different loop'. You need to restructure code and move declaration of sem (probably some other declarations) to another place.Jerkin
@ArtiomKozyrev yeah, cheers, i don't have a Win machine and I'm not even sure i ran it at all. The point was to show how to design this stuff (or how it should be conceptually). Currently ut runs fine on ideone though: ideone.com/a4tsLZ so that should suffice in general for the intended purpose.Proleg

© 2022 - 2025 — McMap. All rights reserved.