How to limit the amount of concurrent async I/O operations?
Asked Answered
S

12

152
// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
urls.AsParallel().ForAll(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
});

Here is the problem, it starts 1000+ simultaneous web requests. Is there an easy way to limit the concurrent amount of these async http requests? So that no more than 20 web pages are downloaded at any given time. How to do it in the most efficient manner?

Substage answered 29/5, 2012 at 21:26 Comment(8)
How is this different from your previous question?Parthen
#9290998 With a ParallelOptions parameter.Tellez
@ChrisDisley, this will only parallelize the launching of the requests.Labellum
@Parthen is right, how is it different? btw, I love the answer there https://mcmap.net/q/76439/-how-to-properly-run-multiple-async-tasks-in-parallel-duplicateSuccursal
Besides HttpClient is IDisposable, and you should dispose it, especially when you're going to use 1000+ of them. HttpClient can be used as a singleton for multiple requests.Annulation
@Shimmy you should never dispose HttpClient: https://mcmap.net/q/76441/-do-httpclient-and-httpclienthandler-have-to-be-disposed-between-requestsCheiro
as google gives this as the first result for similar problem in java, see this: https://mcmap.net/q/76442/-maximum-number-of-spring-quot-async-quot-calls-at-a-timeLetterperfect
​As a side note, the HttpClient class is intended to be instantiated once, and reused throughout the life of an application.Gallup
D
218

You can definitely do this in the latest versions of async for .NET, using .NET 4.5 Beta. The previous post from 'usr' points to a good article written by Stephen Toub, but the less announced news is that the async semaphore actually made it into the Beta release of .NET 4.5

If you look at our beloved SemaphoreSlim class (which you should be using since it's more performant than the original Semaphore), it now boasts the WaitAsync(...) series of overloads, with all of the expected arguments - timeout intervals, cancellation tokens, all of your usual scheduling friends :)

Stephen's also written a more recent blog post about the new .NET 4.5 goodies that came out with beta see What’s New for Parallelism in .NET 4.5 Beta.

Last, here's some sample code about how to use SemaphoreSlim for async method throttling:

public async Task MyOuterMethod()
{
    // let's say there is a list of 1000+ URLs
    var urls = { "http://google.com", "http://yahoo.com", ... };

    // now let's send HTTP requests to each of these URLs in parallel
    var allTasks = new List<Task>();
    var throttler = new SemaphoreSlim(initialCount: 20);
    foreach (var url in urls)
    {
        // do an async wait until we can schedule again
        await throttler.WaitAsync();

        // using Task.Run(...) to run the lambda in its own parallel
        // flow on the threadpool
        allTasks.Add(
            Task.Run(async () =>
            {
                try
                {
                    var client = new HttpClient();
                    var html = await client.GetStringAsync(url);
                }
                finally
                {
                    throttler.Release();
                }
            }));
    }

    // won't get here until all urls have been put into tasks
    await Task.WhenAll(allTasks);

    // won't get here until all tasks have completed in some way
    // (either success or exception)
}

Last, but probably a worthy mention is a solution that uses TPL-based scheduling. You can create delegate-bound tasks on the TPL that have not yet been started, and allow for a custom task scheduler to limit the concurrency. In fact, there's an MSDN sample for it here:

See also TaskScheduler .

Deshabille answered 30/5, 2012 at 6:1 Comment(15)
Doesn't this code end up creating a list containing as many task objects as there are urls? is there anyway to avoid this?Lynnett
isn't a parallel.foreach with a limited degree of parallelism a nicer approach? msdn.microsoft.com/en-us/library/…Lynnett
Please note, that WaitAsync will implicitly increase the internal counter. I've ran into an issue when not staring a task for every, but some of the elements in the source collection. Make sure you only call WaitAsync when you're scheduling a task.Rowley
Why don't you dispose you HttpClientAnnulation
@GreyCloud: Parallel.ForEach works with synchronous code. This allows you to call asynchronous code.Huckaback
@Shimmy, although HttpClient technically inherits from IDisposable, it's not actually doing anything. There is actually no benefit to disposing HttpClient whatsoever.Burrus
@Burrus you're wrong. Besides it's always a good habit to wrap all IDisposables in using or try-finally statements, and assure their disposal.Annulation
Given how popular this answer is, it's worth pointing out that HttpClient can and should be a single common instance rather than an instance per request.Wildon
@RupertRawnsley +1, and of course there is a proof for that on our beloved SO: https://mcmap.net/q/76441/-do-httpclient-and-httpclienthandler-have-to-be-disposed-between-requestsCheiro
What is the benefit of using Task.Run here? I know it's usually used to not block the UI thread, but in here it's hard for me to understand the difference between adding this without Task.Run since, when running this, it seems to be doing the same type of thing.Jackal
@Jackal I was wondering the same thing. As far as I understand, you should only use Task.Run when you have a CPU-intensive task. It seems like you should simply await these calls, so I think you're right. However I'd like someone to triple check my logic.Porche
Task.Run() is necessary here because if you await normally then the requests will be processed one at a time (since it's waiting for the request to finish before continuing the rest of the loop) instead of in parallel. However, if you don't await the request then you will release the semaphore as soon as the task is scheduled (allowing all requests to run at the same time), which defeats the purpose of using it in the first place. The context created by Task.Run is just a place to hold onto the semaphore resource.Forearm
Task.Run() is needed here because the code doesn't create all Tasks at once, but rather schedules 20 active tasks at once. After the loop has completed, there can be 0 to 20 tasks still active (created / running / waiting to run).Decelerate
@Jackal there is hardly any benefit by using Task.Run here, but there is hardly any harm using it either (because the Task.Run method understands async delegates). The alternative would be to use a local function that accepts a url and returns a Task, but local functions were not available at the time this answer was written (C# 7 was released at March 2017).Gallup
would the task.run be necessary if it was an async lambda in urls.select()?Recusancy
D
26

If you have an IEnumerable (ie. strings of URL s) and you want to do an I/O bound operation with each of these (ie. make an async http request) concurrently AND optionally you also want to set the maximum number of concurrent I/O requests in real time, here is how you can do that. This way you do not use thread pool et al, the method uses semaphoreslim to control max concurrent I/O requests similar to a sliding window pattern one request completes, leaves the semaphore and the next one gets in.

usage:

await ForEachAsync(urlStrings, YourAsyncFunc, optionalMaxDegreeOfConcurrency);
public static Task ForEachAsync<TIn>(
        IEnumerable<TIn> inputEnumerable,
        Func<TIn, Task> asyncProcessor,
        int? maxDegreeOfParallelism = null)
    {
        int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
        SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

        IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            try
            {
                await asyncProcessor(input).ConfigureAwait(false);
            }
            finally
            {
                throttler.Release();
            }
        });

        return Task.WhenAll(tasks);
    }
Deeann answered 1/6, 2016 at 12:52 Comment(6)
Do I need to Dispose a SimaphoreSlim?Drayton
no you should not need to explicitly dispose SemaphoreSlim in this implementation and usage as it is used internally inside the method and the method does not access its AvailableWaitHandle property in which case we would have needed to either dispose or wrap it within a using block.Deeann
Just thinking of the best practices and lessons we teach other people. A using would be nice.Drayton
well this example i can follow, but trying work out what is the best way to do this, basically have a throttler but my Func would return a list, which i ultimately want in a final list of all completed when done...which may require locked on list, do you have suggestions.Quyenr
you can slightly update the method so it returns the list of actual tasks and you await Task.WhenAll from inside your calling code. Once Task.WhenAll is complete, you can enumerate over each task in the list and add its list to the final list. Change method signature to 'public static IEnumerable<Task<TOut>> ForEachAsync<TIn, TOut>( IEnumerable<TIn> inputEnumerable, Func<TIn, Task<TOut>> asyncProcessor, int? maxDegreeOfParallelism = null)'Deeann
Two remarks about the ForEachAsync method: 1. The asyncProcessor delegate is invoked invariably for all inputs, even in case an invocation has failed. On the contrary the .NET 6 Parallel.ForEachAsync completes ASAP in case of an error, which arguably is a more reasonable/desirable behavior. 2. This implementation essentially creates a number of workers equal to the number of items, and all workers are waiting to acquire asynchronously the same semaphore. It allocates more memory than the .NET 6 Parallel.ForEachAsync. For huge source sequences, the memory-overhead might be significant.Gallup
G
21

After the release of the .NET 6 (in November, 2021), and for all applications except from ASP.NET, the recommended way of limiting the amount of concurrent asynchronous I/O operations is the Parallel.ForEachAsync API, with the MaxDegreeOfParallelism configuration. Here is how it can be used in practice:

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", /*...*/ };
var client = new HttpClient();
var options = new ParallelOptions() { MaxDegreeOfParallelism = 20 };

// now let's send HTTP requests to each of these URLs in parallel
await Parallel.ForEachAsync(urls, options, async (url, cancellationToken) =>
{
    var html = await client.GetStringAsync(url, cancellationToken);
});

In the above example the Parallel.ForEachAsync task is awaited asynchronously. You can also Wait it synchronously if you need to, which will block the current thread until the completion of all asynchronous operations. The synchronous Wait has the advantage that in case of errors, all exceptions will be propagated. On the contrary the await operator propagates by design only the first exception. In case this is a problem, you can find solutions here.


Note about ASP.NET (not official¹): The Parallel.ForEachAsync API works by launching many workers (tasks) on the ThreadPool, and all the workers are invoking the body delegate in parallel. This goes against the advice offered in the MSDN article Async Programming : Introduction to Async/Await on ASP.NET:

You can kick off some background work by awaiting Task.Run, but there’s no point in doing so. In fact, that will actually hurt your scalability by interfering with the ASP.NET thread pool heuristics. If you have CPU-bound work to do on ASP.NET, your best bet is to just execute it directly on the request thread. As a general rule, don’t queue work to the thread pool on ASP.NET.

So using the Parallel.ForEachAsync in an ASP.NET application could harm the scalability of the application. In ASP.NET applications concurrency is OK, but parallelism should be avoided.

From the currently submitted answers, only Dogu Arslan's answer is suitable for ASP.NET applications, although it doesn't have ideal behavior in case of exceptions (in case of an error the Task might not complete fast enough).

¹ The above note about ASP.NET is my personal suggestion, based on my overall understanding of the technology. It's not an official guideline by Microsoft.

Gallup answered 21/10, 2020 at 1:49 Comment(7)
Is there any official Microsoft documentation stating that Parallel.ForEachAsync should not be used in ASP.NET core code ? The issue you mentioned in your comment for ASP.NET is here right ? Thanks for the clarification.Tetrachord
Let me clarify my previous comment. Based on my understanding of Task.Run and given the linked source code for Parallel.ForEachAsync I think the guideline you proposed for ASP.NET core code is fine. Just asking myself if there is anything official by Microsoft about the intended usage of Parallel.ForEachAsync since this is a common problem.Tetrachord
As a sidenote, if look at this Github issue Stephen Toub seems to suggest Parallel.ForEachAsync as an exact replacement for the SemphoreSlim pattern to limit concurrency. No mention is done here about not using this approach in ASP.NET core code. This means nothing maybe, since the question never mentioned ASP.NET.Tetrachord
@EnricoMassone as long as the body of the parallel loop is completely asynchronous, using the Parallel.ForEachAsync in ASP.NET should be OK. The problem arises when the body includes code that blocks the thread, in which case you will have multiple blocked threads per web request, eventually saturating the ThreadPool.Gallup
@EnricoMassone to be clear, what is mentioned in this answer about ASP.NET is my personal suggestion, based on my overall understanding of the technology. It's not an official suggestion by Microsoft.Gallup
Thanks for the clarification. Based on my understanding I agree with you anyway. I was just curious since I was not able to find any official guideline. Maybe I will post a question issue to them on github asking for some guidance and / or official documentation improvement.Tetrachord
@EnricoMassone thanks for your comments. I edited the answer so that it doesn't give the impression of being an official guideline from Microsoft. Before opening any issue on GitHub, please make sure to read this one first. It provides useful context.Gallup
C
11

There are a lot of pitfalls and direct use of a semaphore can be tricky in error cases, so I would suggest to use AsyncEnumerator NuGet Package instead of re-inventing the wheel:

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
await urls.ParallelForEachAsync(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
}, maxDegreeOfParalellism: 20);
Contract answered 26/8, 2016 at 21:30 Comment(1)
As noted in prior posts you should not be creating new HttpClients in any kind of loop unless you actually enjoy socket exhaustion issues in production.Unappealable
L
7

Unfortunately, the .NET Framework is missing most important combinators for orchestrating parallel async tasks. There is no such thing built-in.

Look at the AsyncSemaphore class built by the most respectable Stephen Toub. What you want is called a semaphore, and you need an async version of it.

Lek answered 29/5, 2012 at 21:49 Comment(4)
Note that "Unfortunately, the .NET Framework is missing most important combinators for orchestrating parallel async tasks. There is no such thing built-in." is no longer correct as of .NET 4.5 Beta. SemaphoreSlim now offers WaitAsync(...) functionality :)Deshabille
Should SemaphoreSlim (with its new async methods) be preferred over AsyncSemphore, or does Toub's implementation still have some advantage?Jabez
In my opinion, the built-in type should be preferred because it is likely to be well-tested and well-designed.Lek
Stephen added a comment in response to a question on his blog post confirming that using SemaphoreSlim for .NET 4.5 would generally be the way to go.Winged
P
3

The SemaphoreSlim can be very helpful here. Here's the extension method I've created:

/// <summary>Concurrently Executes async actions for each item of
/// <see cref="IEnumerable<typeparamref name="T"/></summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of
/// <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxDegreeOfParallelism">Optional, An integer that represents the
/// maximum degree of parallelism, Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel
/// is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
    this IEnumerable<T> enumerable,
    Func<T, Task> action,
    int? maxDegreeOfParallelism = null)
{
    if (maxDegreeOfParallelism.HasValue)
    {
        using (var semaphoreSlim = new SemaphoreSlim(
            maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
        {
            var tasksWithThrottler = new List<Task>();

            foreach (var item in enumerable)
            {
                // Increment the number of currently running tasks and wait if they
                // are more than limit.
                await semaphoreSlim.WaitAsync();

                tasksWithThrottler.Add(Task.Run(async () =>
                {
                    await action(item).ContinueWith(res =>
                    {
                        // action is completed, so decrement the number of
                        // currently running tasks
                        semaphoreSlim.Release();
                    }, TaskScheduler.Default);
                }));
            }

            // Wait for all tasks to complete.
            await Task.WhenAll(tasksWithThrottler.ToArray());
        }
    }
    else
    {
        await Task.WhenAll(enumerable.Select(item => action(item)));
    }
}
    

Sample usage:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
Pule answered 9/5, 2018 at 13:4 Comment(6)
Is there still nothing built into the framework that does this?Arawak
Did you ever make a SelectAsyncConcurrent version of this?Arawak
@Arawak I don't think framework has any built-in mechanism for this as of now.Pule
@Arawak No, I have not built SelectAsyncConcurrent version, but that would be an interesting implementation.Pule
I just made a very clumsy one that simply calls ForEachAsyncConcurrent. I only needed it in one place so it was fine. I just created a ConcurrentStack and added items to it inside a call to your function. The ordering wasn't important for me, but if anyone else attempts it don't use a List because a) it's not thread safe and b) the results may not come back in the same order anyway.Arawak
The same answer has been posted here.Gallup
J
0

Although 1000 tasks might be queued very quickly, the Parallel Tasks library can only handle concurrent tasks equal to the amount of CPU cores in the machine. That means that if you have a four-core machine, only 4 tasks will be executing at a given time (unless you lower the MaxDegreeOfParallelism).

Jiles answered 29/5, 2012 at 21:32 Comment(4)
Yep, but that doesn't relate to async I/O operations. The code above will fire up 1000+ simultaneous downloads even if it is running on a single thread.Substage
Didn't see the await keyword in there. Removing that should solve the problem, correct?Jiles
The library certainly can handle more tasks running (with the Running status) concurrently than the amount of cores. This will be especially the case with a I/O bound Tasks.Parthen
@svick: yep. Do you know how to efficiently control the max concurrent TPL tasks (not threads)?Substage
P
0

In newer versions of .NET (Core 1.0 or higher), you can use the built in TPL Dataflow.

using System.Threading.Tasks.Dataflow;

var client = new HttpClient();

var block = new TransformBlock<string, string>(
    client.GetStringAsync,
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
);
foreach (string url in urls) {
    block.Post(url);
}
block.Complete();

string[] htmls = await block.ReceiveAllAsync().ToArrayAsync();

Assuming you actually need the received contents, and with Dataflow you can do far more complex jobs than this.

Note that you need to install System.Linq.Async package for ToArrayAsync.


As mentioned in comments, ReceiveAllAsync is possibly hazardous if GetStringAsync fails. In this case, if you want to stop the pipeline and propagate exceptions if any exception happens, do not use ReceiveAllAsync:

var htmls = new List<string>();
while (await block.OutputAvailableAsync())
{
    while (block.TryReceive(out string result))
    {
        htmls.Add(result);
    }
}
await block.Completion; // This propagates exceptions

Or if you want to proceed, but record all the exceptions:

var block = new TransformBlock<string, (string? html, Exception? exception)>(
    async url =>
    {
        try
        {
            return (await client.GetStringAsync(url), null);
        }
        catch (Exception e)
        {
            return (null, e);
        }
    },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
);

(string? html, Exception? exception)[] results =
    await block.ReceiveAllAsync().ToArrayAsync();
Push answered 5/4, 2023 at 4:33 Comment(4)
There are a few problems with this. All URLs are posted in the block upfront, so it's not suitable for huge (iterator-generated) input sequences. OperationCanceledExceptions thrown by the GetStringAsync are ignored by design. All exceptions are ignored because of a bug in the ReceiveAllAsync. For these reasons I don't agree with the "you should".Gallup
Now it's better, but I still don't agree with the "you should", because it implies that the TPL Dataflow is clearly the best solution for this problem, by a wide margin. IMHO the Parallel.ForEachAsync API is at least equal, if not better as a solution.Gallup
@TheodorZoulias I don't agree Parallel.ForEachAsync is an equivalent. You need a thread-safe container (say ConcurrentBag<T>) to store the results, and this does not preserve order. PLINQ (urls.AsParallel().WithDegreeOfParallelism(8).Select...) is more of a equivalent but it does block. Anyway, to clarify the answer, I change the phrasing to "you can".Push
Sure, the Parallel.ForEachAsync doesn't collect the results, but the question makes no mention about results. The OP just wants to know how to limit the concurrency of asynchronous I/O operations. A question that might be more relevant to your answer is this: ForEachAsync with Result.Gallup
T
-1

this is not good practice as it changes a global variable. it is also not a general solution for async. but it is easy for all instances of HttpClient, if that's all you're after. you can simply try:

System.Net.ServicePointManager.DefaultConnectionLimit = 20;
Thigh answered 29/3, 2019 at 8:36 Comment(0)
C
-1

Here is a handy Extension Method you can create to wrap a list of tasks such that they will be executed with a maximum degree of concurrency:

/// <summary>Allows to do any async operation in bulk while limiting the system to a number of concurrent items being processed.</summary>
private static IEnumerable<Task<T>> WithMaxConcurrency<T>(this IEnumerable<Task<T>> tasks, int maxParallelism)
{
    SemaphoreSlim maxOperations = new SemaphoreSlim(maxParallelism);
    // The original tasks get wrapped in a new task that must first await a semaphore before the original task is called.
    return tasks.Select(task => maxOperations.WaitAsync().ContinueWith(_ =>
    {
        try { return task; }
        finally { maxOperations.Release(); }
    }).Unwrap());
}

Now instead of:

await Task.WhenAll(someTasks);

You can go

await Task.WhenAll(someTasks.WithMaxConcurrency(20));
Carny answered 18/5, 2022 at 19:1 Comment(0)
J
-2

Parallel computations should be used for speeding up CPU-bound operations. Here we are talking about I/O bound operations. Your implementation should be purely async, unless you're overwhelming the busy single core on your multi-core CPU.

EDIT I like the suggestion made by usr to use an "async semaphore" here.

Jayejaylene answered 29/5, 2012 at 21:34 Comment(11)
Good point! Though each task here will contain async and sync code (page downloaded asynchronously then processed in sync manner). I am trying to distribute the sync portion of the code accross CPUs and at the same time limit the amount of concurrent async I/O operations.Substage
Why? Because launching 1000+ http requests simultaneously might not be a task well suited to the user's network capacity.Labellum
Parallel extensions can also be used as a way to multiplex I/O operations without having to manually implement a pure async solution. Which I agree could be considered sloppy, but as long as you keep a tight limit on the number of concurrent operations it probably won't strain the threadpool too much.Crossways
Don't run long running/blocking operations in the ThreadPool. @SeanU Your suggestion is bad practice and can cause many unintended and nasty side-effects.Labellum
I don't think this answer is providing an answer. Being purely async is not enough here: We really want to throttle the physical IOs in a non-blocking manner.Lek
@Labellum Aside from consuming the entire pool if you don't keep a limit on how many threads you consume, what other unintended or nasty side-effects are there I should be worried about?Crossways
Well, in ideal circumstances, the "entire pool" should really only represent the # processors in the system. Anything larger represents a strained ThreadPool. Because the ThreadPool is reluctant to spin up extra threads and will only do so under sustained stress, other operations that rely on a fluid ThreadPool will now be affected by this implicit latency. For instance: System.Threading.Timer fires its callbacks on the ThreadPool. Now, with ony a few long-lived tasks in the ThreadPool, they're not coming in on time.Labellum
How bad can that really get? For example, is it going to introduce worse delays than a collection of generation 2? I ask because I learned the idiom from Microsoft sample code on how to use TPL, which would seem to imply that it's not the worst practice in the world.Crossways
It can get pretty bad. #10782353Labellum
Thread pool starvation is rather more extreme than what you originally describe. And was addressed in my original comment.Crossways
Hmm.. not sure I agree... when working on a large project, if one too many developers takes this view, you'll get starvation even though each developer's contribution in isolation is not enough to tip things over the edge. Given that there is only one ThreadPool, even if you're treating it semi-respectfully... if everyone else is doing the same, trouble can follow. As such I always advise against running long stuff in the ThreadPool.Labellum
C
-2

Essentially you're going to want to create an Action or Task for each URL that you want to hit, put them in a List, and then process that list, limiting the number that can be processed in parallel.

My blog post shows how to do this both with Tasks and with Actions, and provides a sample project you can download and run to see both in action.

With Actions

If using Actions, you can use the built-in .Net Parallel.Invoke function. Here we limit it to running at most 20 threads in parallel.

var listOfActions = new List<Action>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(() => CallUrl(localUrl)));
}

var options = new ParallelOptions {MaxDegreeOfParallelism = 20};
Parallel.Invoke(options, listOfActions.ToArray());

With Tasks

With Tasks there is no built-in function. However, you can use the one that I provide on my blog.

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
    {
        await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
    }

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
    {
        // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
        var tasks = tasksToRun.ToList();

        using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
        {
            var postTaskTasks = new List<Task>();

            // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
            tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

            // Start running each task.
            foreach (var task in tasks)
            {
                // Increment the number of tasks currently running and wait if too many are running.
                await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);

                cancellationToken.ThrowIfCancellationRequested();
                task.Start();
            }

            // Wait for all of the provided tasks to complete.
            // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
            await Task.WhenAll(postTaskTasks.ToArray());
        }
    }

And then creating your list of Tasks and calling the function to have them run, with say a maximum of 20 simultaneous at a time, you could do this:

var listOfTasks = new List<Task>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(async () => await CallUrl(localUrl)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 20);
Colostomy answered 29/4, 2016 at 8:34 Comment(2)
I think you are just specifying initialCount for SemaphoreSlim and you need to specify 2nd parameter i.e. maxCount in the constructor of SemaphoreSlim.Pule
I want each response from each task processed into a List. How can I get return Result or responseLinell

© 2022 - 2024 — McMap. All rights reserved.