The .NET platform has a nice interface that can represent parent-child relationships, the IGrouping<TKey, TElement>
interface. It is simply an IEnumerable
that also has a Key
property. The key can be anything, and in this case in could be the Page
that needs to be processed. The contents of the grouping could be the Image
s that belong to each page, and need to be uploaded. This leads to the idea of a dataflow block that can process IGrouping<TKey, TInput>
objects, by processing independently each TInput
, then aggegate the results per grouping, and finally output them as IGrouping<TKey, TOutput>
objects. Below is an implementation of this idea:
public static TransformBlock<IGrouping<TKey, TInput>, IGrouping<TKey, TOutput>>
CreateTransformGroupingBlock<TKey, TInput, TOutput>(
Func<TKey, TInput, Task<TOutput>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
if (transform == null) throw new ArgumentNullException(nameof(transform));
dataflowBlockOptions ??= new ExecutionDataflowBlockOptions();
var actionBlock = new ActionBlock<Task<Task<TOutput>>>(taskTask =>
{
// An exception thrown by the following line would cause buggy behavior.
// According to the documentation it should never fail.
taskTask.RunSynchronously();
return taskTask.Unwrap();
}, dataflowBlockOptions);
var completionCTS = new CancellationTokenSource();
_ = actionBlock.Completion
.ContinueWith(_ => completionCTS.Cancel(), TaskScheduler.Default);
var transformBlock = new TransformBlock<IGrouping<TKey, TInput>,
IGrouping<TKey, TOutput>>(async grouping =>
{
if (grouping == null) throw new InvalidOperationException("Null grouping.");
var tasks = new List<Task<TOutput>>();
foreach (var item in grouping)
{
// Create a cold task that will be either executed by the actionBlock,
// or will be canceled by the completionCTS. This should eliminate
// any possibility that an awaited task will remain cold forever.
var taskTask = new Task<Task<TOutput>>(() => transform(grouping.Key, item),
completionCTS.Token);
var accepted = await actionBlock.SendAsync(taskTask);
if (!accepted)
{
// The actionBlock has failed.
// Skip the rest of the items. Pending tasks should still be awaited.
tasks.Add(Task.FromCanceled<TOutput>(new CancellationToken(true)));
break;
}
tasks.Add(taskTask.Unwrap());
}
TOutput[] results = await Task.WhenAll(tasks);
return results.GroupBy(_ => grouping.Key).Single(); // Convert to IGrouping
}, dataflowBlockOptions);
// Cleanup
_ = transformBlock.Completion
.ContinueWith(_ => actionBlock.Complete(), TaskScheduler.Default);
_ = Task.WhenAll(actionBlock.Completion, transformBlock.Completion)
.ContinueWith(_ => completionCTS.Dispose(), TaskScheduler.Default);
return transformBlock;
}
// Overload with synchronous lambda
public static TransformBlock<IGrouping<TKey, TInput>, IGrouping<TKey, TOutput>>
CreateTransformGroupingBlock<TKey, TInput, TOutput>(
Func<TKey, TInput, TOutput> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
if (transform == null) throw new ArgumentNullException(nameof(transform));
return CreateTransformGroupingBlock<TKey, TInput, TOutput>(
(key, item) => Task.FromResult(transform(key, item)), dataflowBlockOptions);
}
This implementation consists of two blocks, a TransformBlock
that processes the groupings and an internal ActionBlock
that processes the individual items. Both are configured with the same user-supplied options. The TransformBlock
sends to the ActionBlock
the items to be processed one by one, then waits for the results, and finally constructs the output IGrouping<TKey, TOutput>
with the following tricky line:
return results.GroupBy(_ => grouping.Key).Single(); // Convert to IGrouping
This compensates for the fact that currently there is no publicly available class that implements the IGrouping
interface, in the .NET platform. The GroupBy
+Single
combo does the trick, but it has the limitation that it doesn't allow the creation of empty IGrouping
s. In case this is an issue, creating a class that implements this interface is always an option. Implementing one is quite straightforward (here is an example).
Usage example of the CreateTransformGroupingBlock
method:
var processPages = new TransformBlock<Page, IGrouping<Page, Image>>(page =>
{
Image[] images = GetImagesFromDB(page);
return images.GroupBy(_ => page).Single(); // Convert to IGrouping
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var uploadImages = CreateTransformGroupingBlock<Page, Image, Image>(async (page, image) =>
{
await UploadImage(image);
return image;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var savePages = new ActionBlock<IGrouping<Page, Image>>(grouping =>
{
var page = grouping.Key;
foreach (var image in grouping) SaveImageToDB(image, page);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
processPages.LinkTo(uploadImages);
uploadImages.LinkTo(savePages);
The type of the uploadImages
variable is TransformBlock<IGrouping<Page, Image>, IGrouping<Page, Image>>
. In this example the types TInput
and TOutput
are the same, because the images need not to be transformed.
BoundedCapacity
smaller than theMaxDegreeOfParallelism
will reduce the degree of parallelism to the value of the capacity. In other words, the block cannot process 8 images simultaneously if it is allowed to buffer only one. – Dupre