Have a set of Tasks with only X running at a time
Asked Answered
T

6

37

Let's say I have 100 tasks that do something that takes 10 seconds. Now I want to only run 10 at a time like when 1 of those 10 finishes another task gets executed till all are finished.

Now I always used ThreadPool.QueueUserWorkItem() for such task but I've read that it is bad practice to do so and that I should use Tasks instead.

My problem is that I nowhere found a good example for my scenario so could you get me started on how to achieve this goal with Tasks?

Trinetta answered 28/12, 2012 at 19:58 Comment(3)
I would suggest reading some articles and or previous Stackoverflow postings there are plenty of coded examples that others have tried and where answers are provided #6193398 do a google search like I have C# Stackoverflow ThreadPool.QueueUserWorkItem()Reasoned
Do you want a method that blocks until all of the tasks are done, or do you want a method that returns a Task when all of the tasks are done?Purdah
It should block, just like ThreadPool does but with tasks. Some guys here on Stackoverflow told me on a code sample that Threadpool would be bad practiceTrinetta
R
41
SemaphoreSlim maxThread = new SemaphoreSlim(10);

for (int i = 0; i < 115; i++)
{
    maxThread.Wait();
    Task.Factory.StartNew(() =>
        {
            //Your Works
        }
        , TaskCreationOptions.LongRunning)
    .ContinueWith( (task) => maxThread.Release() );
}
Romans answered 28/12, 2012 at 20:20 Comment(7)
Why do you specify TaskCreationOptions.LongRunning?Aurelea
I would recommend using the Semaphore class instead of SemaphoreSlim if your task is intended to be a long running task.Histo
@MarcGravell I may be missing something but I only see a main thread running the loop and the workers(10) running at the same time.. The blocked one is the main thread(1)Romans
@L.B. I apologise; I misread the location of the wait - my bad - I read in haste, and err'dPercussionist
@Romans How to add a new task to queue when any one task of 10 task completed. Means add a new task to the queue. Here my objective is for example if we already set a limit of 10 task run in a single time by SemaphoreSlim or MaxDegreeOfParallelism but I don't want to create 100 task and then set limit by SemaphoreSlim or MaxDegreeOfParallelism and control them to run 10 on a single time. , I only want to create a new task when any one task completed from 10 task and this process will continue infinitely.Plato
@Plato My answer does exactly that. Just replace for loop with an infinite loopRomans
This will not wait for last 10 tasks in worst case and at least 1 task in best case.Residual
L
20

TPL Dataflow is great for doing things like this. You can create a 100% async version of Parallel.Invoke pretty easily:

async Task ProcessTenAtOnce<T>(IEnumerable<T> items, Func<T, Task> func)
{
    ExecutionDataflowBlockOptions edfbo = new ExecutionDataflowBlockOptions
    {
         MaxDegreeOfParallelism = 10
    };

    ActionBlock<T> ab = new ActionBlock<T>(func, edfbo);

    foreach (T item in items)
    {
         await ab.SendAsync(item);
    }

    ab.Complete();
    await ab.Completion;
}
Lauranlaurance answered 28/12, 2012 at 21:57 Comment(5)
the TPL dataflow library is actually super cool and i've already found a use for it, thanks for pointing it outTrustbuster
that's a great approach, is it possible to make it return though? - I'm thinking in using it to call a service which takes a while to answer.Soosoochow
@GabrielEspinoza this is only a tiny bit of what TPL Dataflow can do. You might be able to use a TransformBlock for what you want.Lauranlaurance
The nuget for TPL Dataflow has now been unlisted. It has been replaced with System.Threading.Tasks.DataflowWeald
Could you update this demo to show where a user should add their commands to execute on each item, and how to pass in parameters for the items?Dorsiventral
P
9

You have several options. You can use Parallel.Invoke for starters:

public void DoWork(IEnumerable<Action> actions)
{
    Parallel.Invoke(new ParallelOptions() { MaxDegreeOfParallelism = 10 }
        , actions.ToArray());
}

Here is an alternate option that will work much harder to have exactly 10 tasks running (although the number of threads in the thread pool processing those tasks may be different) and that returns a Task indicating when it finishes, rather than blocking until done.

public Task DoWork(IList<Action> actions)
{
    List<Task> tasks = new List<Task>();
    int numWorkers = 10;
    int batchSize = (int)Math.Ceiling(actions.Count / (double)numWorkers);
    foreach (var batch in actions.Batch(actions.Count / 10))
    {
        tasks.Add(Task.Factory.StartNew(() =>
        {
            foreach (var action in batch)
            {
                action();
            }
        }));
    }

    return Task.WhenAll(tasks);
}

If you don't have MoreLinq, for the Batch function, here's my simpler implementation:

public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int batchSize)
{
    List<T> buffer = new List<T>(batchSize);

    foreach (T item in source)
    {
        buffer.Add(item);

        if (buffer.Count >= batchSize)
        {
            yield return buffer;
            buffer = new List<T>();
        }
    }
    if (buffer.Count >= 0)
    {
        yield return buffer;
    }
}
Purdah answered 28/12, 2012 at 20:4 Comment(6)
Now I want to only run 10 at a time, MaxDegreeOfParallelism is only an upper bound.Romans
@Romans Well, technically, if the number of units of work isn't divisible by 10 then exactly ten isn't possible.Purdah
No even with 100 works, it may end up running fewer Tasks. It depends on many parameters such as # of CPUsRomans
@Romans I'm aware of that. I was stating that even if you tried to be closer to exactly 10 you can't always be perfect, you can only be...closer.Purdah
@Romans Does the additional version I added satisfy you?Purdah
I would do it differently. So I posted it as an answerRomans
K
6

You can create a method like this:

public static async Task RunLimitedNumberAtATime<T>(int numberOfTasksConcurrent, 
    IEnumerable<T> inputList, Func<T, Task> asyncFunc)
{
    Queue<T> inputQueue = new Queue<T>(inputList);
    List<Task> runningTasks = new List<Task>(numberOfTasksConcurrent);
    for (int i = 0; i < numberOfTasksConcurrent && inputQueue.Count > 0; i++)
        runningTasks.Add(asyncFunc(inputQueue.Dequeue()));

    while (inputQueue.Count > 0)
    {
        Task task = await Task.WhenAny(runningTasks);
        runningTasks.Remove(task);
        runningTasks.Add(asyncFunc(inputQueue.Dequeue()));
    }

    await Task.WhenAll(runningTasks);
}

And then you can call any async method n times with a limit like this:

Task task = RunLimitedNumberAtATime(10,
    Enumerable.Range(1, 100),
    async x =>
    {
        Console.WriteLine($"Starting task {x}");
        await Task.Delay(100);
        Console.WriteLine($"Finishing task {x}");
    });

Or if you want to run long running non async methods, you can do it that way:

Task task = RunLimitedNumberAtATime(10,
    Enumerable.Range(1, 100),
    x => Task.Factory.StartNew(() => {
        Console.WriteLine($"Starting task {x}");
        System.Threading.Thread.Sleep(100);
        Console.WriteLine($"Finishing task {x}");
    }, TaskCreationOptions.LongRunning));

Maybe there is a similar method somewhere in the framework, but I didn't find it yet.

Kalle answered 25/4, 2018 at 13:58 Comment(1)
I've adapted this code to run different tasks with no parameters, it works wellTuberous
O
5

I would love to use the simplest solution I can think of which as I think using the TPL:

string[] urls={};
Parallel.ForEach(urls, new ParallelOptions() { MaxDegreeOfParallelism = 2}, url =>
{
   //Download the content or do whatever you want with each URL
});
Octavie answered 1/9, 2014 at 9:15 Comment(1)
If the tasks are CPU-bound, then yes. If the tasks are I/O bound, then definitely not. https://mcmap.net/q/414472/-have-a-set-of-tasks-with-only-x-running-at-a-timeSurveillance
S
0
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp2
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var rnd = new Random();
            var throttler = new SemaphoreSlim( /*degreeOfParallelism:*/ 5);

            var tasks = Enumerable.Range(1, 20).Select(i => PrintNumber(i, throttler, rnd));
            Task.WhenAll(tasks).GetAwaiter();

            Console.ReadKey();
        }

        private static async Task PrintNumber(int number, SemaphoreSlim throttler, Random random)
        {
            using (await throttler.Throttle().ConfigureAwait(false))
            {
                await Task.Delay(TimeSpan.FromSeconds(random.Next(1, 5))).ConfigureAwait(false);
                Console.WriteLine(number);
            }
        }
    }

    internal static class ThrottlerExt // https://gaevoy.com/2019/01/27/throttling-async-task.html, 2023-12-21
    {
        internal static async Task<IDisposable> Throttle(this SemaphoreSlim throttler)
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            return new Throttler(throttler);
        }

        private class Throttler : IDisposable
        {
            private readonly SemaphoreSlim _throttler;

            public Throttler(SemaphoreSlim throttler) => _throttler = throttler;

            public void Dispose() => _throttler.Release();
        }
    }
}
Surveillance answered 26/4, 2024 at 14:35 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.