ForEachAsync with Result
Asked Answered
P

2

1

I'm trying to change Stephen Toub's ForEachAsync<T> extension method into an extension which returns a result...

Stephen's extension:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current); 
        })); 
}

My approach (not working; tasks get executed but result is wrong)

public static Task<TResult[]> ForEachAsync<T, TResult>(this IList<T> source,
    int degreeOfParallelism, Func<T, Task<TResult>> body)
{
    return Task.WhenAll<TResult>(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
        select Task.Run<TResult>(async () = 
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current); // When I "return await",
                        // I get good results but only one per partition 
            return default(TResult);
        }));
}

I know I somehow have to return (WhenAll?) the results from the last part but I didn't yet figure out how to do it...

Update: The result I get is just degreeOfParallelism times null (I guess because of default(TResult)) even though all the tasks get executed. I also tried to return await body(...) and then the result was fine, but only degreeOfParallelism number of tasks got executed.

Pooch answered 18/6, 2015 at 6:6 Comment(11)
"Result is wrong" really doesn't describe what you're seeing at all. The fact that you're returning default(TResult) doesn't seem like a good start. It would help if you'd provide a short but complete program demonstrating the problem, including sample input, expected output and actual output. (I strongly suspect you want SelectMany instead of Select here, basically...)Areopagus
@JonSkeet: added an updatePooch
Couldn’t you make a list where you add your results to that you then return at the very end after everything finished?Ammonic
@poke: thought of that too but I believe this is not really async-like?!?Pooch
It doesn't help that the code you've posted currently wouldn't compile...Areopagus
@JonSkeet: I'm sorry, somehow brackets got not copied and pasted... I should now work...Pooch
Nope, you appear to have a parameter of type Func<T> with no name. Again, if you'd posted a short but complete example to start with, you could be confident that it would be compile because it would be your exact code.Areopagus
@JonSkeet: My bad, I apologize... updated it again.Pooch
@Dunken: Are you absolutely sure you need a parallel and asynchronous foreach? Parallel implies CPU-heavy code, and asynchronous implies I/O-heavy code. Yours is both? If you are in this (very rare) situation, consider using TPL Dataflow.Sena
@StephenCleary: good question... I believe yes. Actually I want to stress-test an API (I also want to measure the throughput). I first went with tasks only but this overwhelmed my system (to many HTTP connections?). Because of this I now switched to the current solution which allows me to call my API a few thousand times with a fixed number of concurrent calls... Do you agree? Thanks for the hint. I do have to admit I don't know it very well but I think it's worth digging in...Pooch
You can achieve throttling for asynchronous code via SemaphoreSlim. I just tend to avoid parallel code unless there's a need for it (i.e., CPU-heavy code, which is not the case here).Sena
A
3

Your LINQ query can only ever have the same number of results as the number of partitions - you're just projecting each partition into a single result.

If you don't care about the order, you just need to assemble the results of each partition into a list, then flatten them afterwards.

public static async Task<TResult[]> ExecuteInParallel<T, TResult>(this IList<T> source, int degreeOfParalleslism, Func<T, Task<TResult>> body)
{
    var lists = await Task.WhenAll<List<TResult>>(
        Partitioner.Create(source).GetPartitions(degreeOfParalleslism)
            .Select(partition => Task.Run<List<TResult>>(async () =>
                    {
                        var list = new List<TResult>();
                        using (partition)
                        {
                            while (partition.MoveNext())
                            {
                                list.Add(await body(partition.Current));
                            }
                        }
                        return list;
                   })));
     return lists.SelectMany(list => list).ToArray();
}

(I've renamed this from ForEachAsync, as ForEach sounds imperative (suitable for the Func<T, Task> in the original) whereas this is fetching results. A foreach loop doesn't have a result - this does.)

Areopagus answered 18/6, 2015 at 6:21 Comment(6)
Oh right, keep the lists in the synchronous part, and return lists. That’s good! Guess I wasn’t too far off after all.Ammonic
@JonSkeet: doesn't compile (list.Add instead of results.Add) but it's still not workingPooch
@Dunken: I've fixed the list.Add, but please give more information than "it's still not working" - that really doesn't provide much for me to go on. (Again, if you'd provided a short but complete program demonstrating the problem, it would have been easier for me to test it for myself...)Areopagus
@JonSkeet: I meant it still doesn't compile... I'm working on an example... For now: The await operator in the 3rd line can only be used within an async method...Pooch
@DunkenL: Okay, if you'd said that before I'd have fixed it immediately - the method just needs to be an async method. Always say what the error is, rather than just "it doesn't work".Areopagus
@Dunken: Doh, typo. Should be async instead of await. Try now... will try compiling it myself, too...Areopagus
K
8

Now that the Parallel.ForEachAsync API has become part of the standard libraries (.NET 6), it makes sense to implement a variant that returns a Task<TResult[]>, based on this API. Here is an implementation that targets .NET 8:

/// <summary>
/// Executes a foreach loop on an enumerable sequence, in which iterations may run
/// in parallel, and returns the results of all iterations in the original order.
/// </summary>
public static Task<TResult[]> ForEachAsync<TSource, TResult>(
    IEnumerable<TSource> source,
    ParallelOptions parallelOptions,
    Func<TSource, CancellationToken, ValueTask<TResult>> body)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(parallelOptions);
    ArgumentNullException.ThrowIfNull(body);
    List<TResult> results = new();
    if (source.TryGetNonEnumeratedCount(out int count)) results.Capacity = count;
    IEnumerable<(TSource, int)> withIndexes = source.Select((x, i) => (x, i));
    return Parallel.ForEachAsync(withIndexes, parallelOptions, async (entry, ct) =>
    {
        (TSource item, int index) = entry;
        TResult result = await body(item, ct).ConfigureAwait(false);
        lock (results)
        {
            if (index >= results.Count)
                CollectionsMarshal.SetCount(results, index + 1);
            results[index] = result;
        }
    }).ContinueWith(t =>
    {
        TaskCompletionSource<TResult[]> tcs = new();
        switch (t.Status)
        {
            case TaskStatus.RanToCompletion:
                lock (results) tcs.SetResult(results.ToArray()); break;
            case TaskStatus.Faulted:
                tcs.SetException(t.Exception.InnerExceptions); break;
            case TaskStatus.Canceled:
                tcs.SetCanceled(new TaskCanceledException(t).CancellationToken); break;
            default: throw new UnreachableException();
        }
        Debug.Assert(tcs.Task.IsCompleted);
        return tcs.Task;
    }, default, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}

This implementation supports all the options and the functionality of the Parallel.ForEachAsync overload that has an IEnumerable<T> as source. Its behavior in case of errors and cancellation is identical. The results are arranged in the same order as the associated elements in the source sequence.

The CollectionsMarshal.SetCount is an advanced API that was introduced in .NET 8. It alters the Count of a List<T>, exposing uninitialized data when increased. For a less modern (and slightly less performant) approach that runs on .NET 6, see the 5th revision of this answer.

Khano answered 15/2, 2022 at 16:7 Comment(13)
Nice! There should be an overload built-in with an index parameter to the delegate. The only thing I would change at first glance: I would use a ConcurrentDictionary<int, TResult> instead of a List<TResult> and I would remove the lock.Microanalysis
@Microanalysis the Parallel.ForEachAsync overload with indices sounds like a legit idea. You could consider submitting an API suggestion to the dotnet/runtime repository. I would give it a ~10% chance to be accepted (after a few years of deliberations). As for switching to a ConcurrentDictionary<K,V> for storing the results, it is also a valid idea. I avoided it because it will immediately increase the memory consumption, while the performance benefits will be most likely negligible.Khano
The Parallel.ForEach already has such an overload, so it might be added eventually.Microanalysis
@Microanalysis it might, but don't hold your breath. The evolution of .NET APIs is a slow and hesitant process. :-)Khano
@TheodorZoulias Followed a thread of SO questions, answers and comments from you to get here. In a different attempt to implement a ParallelForEachAsync you commented about CancellationToken usage here: #15137042. I'm surprised to not see any 'check for is cancelled' in the code above. Could you explain how that works?Leandro
@TheodorZoulias On the same ParallelForEachAsync solution, you commented about WhenAll being suitable for small number or never throwing error here #15137042. I have approximately <= 25 REST calls I need to make. I just want to either await all results or cancel as soon as the first one fails (cancelling all other requests). Best practice is to just use this method and set degreeOfParallellism to the size of my 'task list'?Leandro
@Leandro the cancellation here is handled by the native Parallel.ForEachAsync itself. If you want to cancel the parallel operation, you can configure the CancellationToken property of the ParallelOptions. Regarding how to configure the MaxDegreeOfParallelism property, there is no one-fits-all rule, because it is highly dependand on the capabilities of the hardware, network etc that does the actual work. I have posted some suggestions about this here.Khano
@TheodorZoulias Thanks for the info. I've posted a new question #76502426 to try and explain what I want.Leandro
@TheodorZoulias Why not open a PR to dotnet/runtime for this?Chauncey
@JohnZabroski at least one proposal already exists: API Proposal: Provide a result returning Parallel.ForEachAsync overload.Khano
@TheodorZoulias Ran into an issue (or request) using your implementation. This runs all tasks to completion right? And if any throw exceptions, returns all exceptions...I wonder if it would be better to return all results and exceptions so it is similar to inspecting results from Tasks.WhenAll where you can look to see if it ran to completion, exception, or canceled? Unless I'm not following the code right, seems you can only get results (if all succeed) or an exception is thrown (with all tasks that threw exceptions).Leandro
@Leandro yes, any exception in the body will cause the ForEachAsync to complete in a faulted state, with no results returned. It has the same behavior with PLINQ: items.AsParallel().Select(x => Process(x)).ToList();. It's all or nothing. If you want a fault-tolerant solution, you have to catch exceptions manually inside the body and return a (TResult Result, Exception Exception)[] (an array of value tuples) instead of TResult[]. It should be a fairly easy modification.Khano
@Leandro if you want it to be extra cute you could use Stephen Cleary's Try<T> struct, which is a wrapper monad that represents either a value or an exception: Try<TResult> result = await Try.Create(() => body(item, ct)).ConfigureAwait(false);Khano
A
3

Your LINQ query can only ever have the same number of results as the number of partitions - you're just projecting each partition into a single result.

If you don't care about the order, you just need to assemble the results of each partition into a list, then flatten them afterwards.

public static async Task<TResult[]> ExecuteInParallel<T, TResult>(this IList<T> source, int degreeOfParalleslism, Func<T, Task<TResult>> body)
{
    var lists = await Task.WhenAll<List<TResult>>(
        Partitioner.Create(source).GetPartitions(degreeOfParalleslism)
            .Select(partition => Task.Run<List<TResult>>(async () =>
                    {
                        var list = new List<TResult>();
                        using (partition)
                        {
                            while (partition.MoveNext())
                            {
                                list.Add(await body(partition.Current));
                            }
                        }
                        return list;
                   })));
     return lists.SelectMany(list => list).ToArray();
}

(I've renamed this from ForEachAsync, as ForEach sounds imperative (suitable for the Func<T, Task> in the original) whereas this is fetching results. A foreach loop doesn't have a result - this does.)

Areopagus answered 18/6, 2015 at 6:21 Comment(6)
Oh right, keep the lists in the synchronous part, and return lists. That’s good! Guess I wasn’t too far off after all.Ammonic
@JonSkeet: doesn't compile (list.Add instead of results.Add) but it's still not workingPooch
@Dunken: I've fixed the list.Add, but please give more information than "it's still not working" - that really doesn't provide much for me to go on. (Again, if you'd provided a short but complete program demonstrating the problem, it would have been easier for me to test it for myself...)Areopagus
@JonSkeet: I meant it still doesn't compile... I'm working on an example... For now: The await operator in the 3rd line can only be used within an async method...Pooch
@DunkenL: Okay, if you'd said that before I'd have fixed it immediately - the method just needs to be an async method. Always say what the error is, rather than just "it doesn't work".Areopagus
@Dunken: Doh, typo. Should be async instead of await. Try now... will try compiling it myself, too...Areopagus

© 2022 - 2024 — McMap. All rights reserved.