Python async coroutines in batches
Asked Answered
B

1

9

I'd like to process coroutines in batches like:

import asyncio

async def test(i):
    print(f"Start Task {i}")
    await asyncio.sleep(0.1)
    print(f"Finished Task {i}")

async def main():
    for i in range(10):
        await asyncio.gather(*[test(10*i+j) for j in range(10)])


asyncio.run(main())

Is there a way to do this with Python builtins or libraries so that I do not have to create the batches individually?

Unfortunately

async with asyncio.Semaphore(10):
    await asyncio.gather(*[test(i) for i in range(100)])

isn't processing the coroutines as expected: the coroutines are created all at once. Only the excecutions are limited. I don't want to create all tasks at once. The tasks should be created in batches.

Bornie answered 25/6, 2021 at 21:25 Comment(8)
How exactly are the coroutines being unexpectedly processed? I tested this code and did notice that the prints return in a disordered, but also oddly not random order. I think this is just due to how concurrency is written in python, but if this is the problem then I would suggest basing your outputs on returned values from the function, by setting the asyncio.gather to a variable and then printing that.Jerrome
If by saying "isn't processing the coroutines as expected" you mean that results are returned in order this is because gather returns results in the same order that it was passed to the function, so you'll see task 1 first, task 2 second etc. Tasks are actually processed in async batches 10 times, try changing to wait and you'll see that results gets randomised since wait does not preserve the result order.Domiciliate
Sorry for being unprecise. I updated my question.Bornie
The principle of gather is to run the coroutines concurrently... Hence the behaviour you observe is correct... I don't see the problem of using a for loop to create your batches.... Otherwise try reading about the threads: docs.python.org/3/library/asyncio-task.html#id10Unbend
Do you want to run a batch of 10 coroutines until they all exit, then next 10, then next 10? Or do you prefer to start 10 coroutines, and when one of them exits, start another to keep the count limited to 10? I'm asking because your question and your own answer do not match in this respect.Regularize
@Regularize Your objection is justified, the processing in batches was my first thought until I came across the solution to limit the coroutines to 10 tasks, so that a new one is started as soon as one has been processed.Bornie
@finefoot If i have many tasks creation will consume much time and memory. Also if I limit the number of semaphore to 1 I can easy debugBornie
@Bornie That's a rational explanation. A year ago I answered a somehow similar question, it also uses a semaphore: https://mcmap.net/q/1220749/-python-asyncio-runtimeerror-await-wasn-39-t-used-with-futureRegularize
B
5

This is what I was looking for:

import asyncio

from aiodecorators import Semaphore


@Semaphore(10)
async def test(i):
    print(f"Start Task {i}")
    await asyncio.sleep(0.1)
    print(f"Finished Task {i}")


async def main():
    await asyncio.gather(*[test(i) for i in range(100)])


asyncio.run(main())
Bornie answered 29/6, 2021 at 13:53 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.