How to correctly write Parallel.For with async methods [duplicate]
Asked Answered
N

4

42

How would I structure the code below so that the async method gets invoked?

Parallel.For(0, elevations.Count(), delegate(int i)
{
   allSheets.AddRange(await BuildSheetsAsync(userID, elevations[i], includeLabels));
});
Namedropping answered 9/10, 2013 at 22:55 Comment(0)
M
52

Parallel.For() doesn't work well with async methods. If you don't need to limit the degree of parallelism (i.e. you're okay with all of the tasks executing at the same time), you can simply start all the Tasks and then wait for them to complete:

var tasks = Enumerable.Range(0, elevations.Count())
    .Select(i => BuildSheetsAsync(userID, elevations[i], includeLabels));
List<Bitmap> allSheets = (await Task.WhenAll(tasks)).SelectMany(x => x).ToList();
Microelement answered 10/10, 2013 at 9:50 Comment(6)
I'm not able to add the collection of List<Bitmps> it is asking for IEnnumerable. how to do i get this to work? var task = Enumerable.Range(0, elevations.Length).Select(i => BuildSheetsAsync(userID, elevations[i], includeLabels)); List<Bitmap> allSheets = new List<Bitmap>(); allSheets.AddRange(await Task.WhenAll(task)); The error occurs when I allSheets.AddRange(await Task.WhenAll(task));Namedropping
@AlumCloud.Com Ah, I didn't notice BuildSheetsAsync() already returns a collection. If you want to make a single collection out of a collection of collections, you can use SelectMany(). And to make a list out of an IEnumerable collection, use ToList().Microelement
"Parallel.For() doesn't work well with async methods." - Why shouldn't they work well together? They are two different concepts that help increase CPU efficiency. "If you don't need to limit the degree of parallelism..." - It seems counter-productive to make this assumption. A good program would purposely limit the degree of parallelism based on the capabilities of the server it's running on, which could change.Chucklehead
@Chucklehead They don't work well together, because with await, processing ends when the returned Task is completed, not when the function returns. But Parallel.For does not understand that and assumes that a function ends when it returns. And limiting the degree of parallelism can be important, but it can also be complicated. If I don't know whether the complicated approach is necessary, I recommend starting with the simple one.Microelement
How would you go about setting a limit a la maximum degree of parallelism as you would with Parallel.For?Ardell
For MaxDegreeOfParallelism problems see this answerMetaphysics
U
7

You can try this code I'm using. it using foreach and SemaphoreSlim to achive parallel asynchronous.

public static class ParallelAsync
{
    public static async Task ForeachAsync<T>(IEnumerable<T> source, int maxParallelCount, Func<T, Task> action)
    {
        using (SemaphoreSlim completeSemphoreSlim = new SemaphoreSlim(1))
        using (SemaphoreSlim taskCountLimitsemaphoreSlim = new SemaphoreSlim(maxParallelCount))
        {
            await completeSemphoreSlim.WaitAsync();
            int runningtaskCount = source.Count();

            foreach (var item in source)
            {
                await taskCountLimitsemaphoreSlim.WaitAsync();

                Task.Run(async () =>
                {
                    try
                    {
                        await action(item).ContinueWith(task =>
                        {
                            Interlocked.Decrement(ref runningtaskCount);
                            if (runningtaskCount == 0)
                            {
                                completeSemphoreSlim.Release();
                            }
                        });
                    }
                    finally
                    {
                        taskCountLimitsemaphoreSlim.Release();
                    }
                }).GetHashCode();
            }

            await completeSemphoreSlim.WaitAsync();
        }
    }
}

usage:

string[] a = new string[] {
    "1",
    "2",
    "3",
    "4",
    "5",
    "6",
    "7",
    "8",
    "9",
    "10",
    "11",
    "12",
    "13",
    "14",
    "15",
    "16",
    "17",
    "18",
    "19",
    "20"
};

Random random = new Random();

await ParallelAsync.ForeachAsync(a, 2, async item =>
{
    Console.WriteLine(item + " start");

    await Task.Delay(random.Next(1500, 3000));
    Console.WriteLine(item + " end");
});

Console.WriteLine("All finished");

any suggestion please let me know.

Unlearned answered 15/10, 2019 at 5:1 Comment(1)
Seems the code does not consider the case when action(item) raise an exceptionBarbera
S
3

I'd recommend you to take a look at this question I asked a few days ago and ended-up answering myself, basically I was looking for a parallel and asynchronous ForEach method.

The method uses SemaphoreSlim to process things in parallel and it accepts asynchronous methods as an input action.

You might also want to take a look at the two links I have provided at the end of my answer, they have been really helpful for realizing such behavior and they also contain another way of doing this using a Partitioner instead.

Personally, I didn't like the Parallel.For because it's a synchronous call as explained in the links I've given; I wanted it all 'async' :-)

Here it is : Asynchronously and parallelly downloading files

Succinct answered 9/10, 2013 at 23:10 Comment(1)
+1, besides SemaphoreSlim apparently is not required in the OP's case. Also may I suggest using CancellationTokenSource with timeout instead of Timer object in your code.Cimino
D
-6

The easiest way to invoke your async method inside Parallel.For is next:

Parallel.For(0, elevations.Count(), async i =>
{
   allSheets.AddRange(await BuildSheetsAsync(userID, elevations[i], includeLabels));
});

==============

MarioDS mentioned absolutely right in the comment that in that case you may have unobserved exceptions. And this is definitely very important thing which you should always take in mind then have a deal with async delegates.

In this case if you think that you will have exceptions you can use try/catch block inside delegate. Or in some cases if your situation is good for it you can subscribe on TaskScheduler.UnobservedTaskException event.

Dover answered 29/2, 2016 at 6:37 Comment(1)
This leads to bugs. Async delegates return Task and the Parallel construct doesn't await that task. This means exceptions are unobserved and you can't be sure after the Parallel.For invocation has run that all the work has actually been completed (nor can you verify easily).Meliamelic

© 2022 - 2024 — McMap. All rights reserved.