The problem of parallelizing asynchronous operations has been solved with the introduction of the Parallel.ForEachAsync
API in .NET 6, but people who are using older .NET platforms might still need a decent substitute. An easy way to implement one is to use an ActionBlock<T>
component from the TPL Dataflow library. This library is included in the standard .NET libraries (.NET Core and .NET 5+), and available as a NuGet package for the .NET Framework. Here is how it can be used:
public static Task Parallel_ForEachAsync<T>(ICollection<T> source,
int maxDegreeOfParallelism, Func<T, Task> action)
{
var options = new ExecutionDataflowBlockOptions();
options.MaxDegreeOfParallelism = maxDegreeOfParallelism;
var block = new ActionBlock<T>(action, options);
foreach (var item in source) block.Post(item);
block.Complete();
return block.Completion;
}
This solution is only suitable for materialized source
sequences, hence the type of the parameter is ICollection<T>
instead of the more common IEnumerable<T>
. It also has the surprising behavior of ignoring any OperationCanceledException
s thrown by the action
. Addressing these nuances and attempting to replicate precisely the features and behavior of the Parallel.ForEachAsync
is doable, but it requires almost as much code as if more primitive tools were used. I've posted such an attempt in the 9th revision of this answer.
Below is a different attempt to implement the Parallel.ForEachAsync
method, offering exactly the same features as the .NET 6 API, and mimicking its behavior as much as possible. It uses only basic TPL tools. The idea is to create a number of worker tasks equal to the desirable MaxDegreeOfParallelism
, with each task enumerating the same enumerator in a synchronized fashion. This is similar to how the Parallel.ForEachAsync
is implemented internally. The difference is that the .NET 6 API starts with a single worker and progressively adds more, while the implementation below creates all the workers from the start:
public static Task Parallel_ForEachAsync<T>(IEnumerable<T> source,
ParallelOptions parallelOptions,
Func<T, CancellationToken, Task> body)
{
if (source == null) throw new ArgumentNullException("source");
if (parallelOptions == null) throw new ArgumentNullException("parallelOptions");
if (body == null) throw new ArgumentNullException("body");
int dop = parallelOptions.MaxDegreeOfParallelism;
if (dop < 0) dop = Environment.ProcessorCount;
CancellationToken cancellationToken = parallelOptions.CancellationToken;
TaskScheduler scheduler = parallelOptions.TaskScheduler ?? TaskScheduler.Current;
IEnumerator<T> enumerator = source.GetEnumerator();
CancellationTokenSource cts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
var semaphore = new SemaphoreSlim(1, 1); // Synchronizes the enumeration
var workerTasks = new Task[dop];
for (int i = 0; i < dop; i++)
{
workerTasks[i] = Task.Factory.StartNew(async () =>
{
try
{
while (true)
{
if (cts.IsCancellationRequested)
{
cancellationToken.ThrowIfCancellationRequested();
break;
}
T item;
await semaphore.WaitAsync(); // Continue on captured context.
try
{
if (!enumerator.MoveNext()) break;
item = enumerator.Current;
}
finally { semaphore.Release(); }
await body(item, cts.Token); // Continue on captured context.
}
}
catch { cts.Cancel(); throw; }
}, CancellationToken.None, TaskCreationOptions.DenyChildAttach, scheduler)
.Unwrap();
}
return Task.WhenAll(workerTasks).ContinueWith(t =>
{
// Clean up (dispose all disposables)
using (enumerator) using (cts) using (semaphore) { }
return t;
}, CancellationToken.None, TaskContinuationOptions.DenyChildAttach |
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}
There is a difference in the signature. The body
parameter is of type Func<TSource, CancellationToken, Task>
instead of Func<TSource, CancellationToken, ValueTask>
. This is because value-tasks are a relatively recent feature, and are not available in .NET Framework.
There is also a difference in the behavior. This implementation reacts to OperationCanceledException
s thrown by the body
, by completing as canceled. The correct behavior would be to propagate these exceptions as individual errors, and complete as faulted. Fixing this minor flaw is doable, but I preferred to not complicate further this relatively short and readable implementation.
Parallel.ForEachAsync
API. – Dearborn