A proper implementation of a Splitter and an Aggregator block would be way too complex to implement, and too cumbersome to use. So I came up with a simpler API, that encapsulates two blocks, a master block and a detail block. The processing options for each block are different. The master block executes the splitting and the aggregating actions, while the detail block executes the transformation of each detail. The only requirement regarding the two separate sets of options is that the CancellationToken
must be the same for both. All other options (MaxDegreeOfParallelism
, BoundedCapacity
, EnsureOrdered
, TaskScheduler
etc) can be set independently for each block.
public static TransformBlock<TInput, TOutput>
CreateSplitterAggregatorBlock<TInput, TDetail, TDetailResult, TOutput>(
Func<TInput, Task<IEnumerable<TDetail>>> split,
Func<TDetail, Task<TDetailResult>> transformDetail,
Func<TInput, TDetailResult[], TOutput> aggregate,
ExecutionDataflowBlockOptions splitAggregateOptions = null,
ExecutionDataflowBlockOptions transformDetailOptions = null)
{
if (split == null) throw new ArgumentNullException(nameof(split));
if (aggregate == null) throw new ArgumentNullException(nameof(aggregate));
if (transformDetail == null)
throw new ArgumentNullException(nameof(transformDetail));
splitAggregateOptions = splitAggregateOptions ??
new ExecutionDataflowBlockOptions();
var cancellationToken = splitAggregateOptions.CancellationToken;
transformDetailOptions = transformDetailOptions ??
new ExecutionDataflowBlockOptions() { CancellationToken = cancellationToken };
if (transformDetailOptions.CancellationToken != cancellationToken)
throw new ArgumentException("Incompatible options", "CancellationToken");
var detailTransformer = new ActionBlock<Task<Task<TDetailResult>>>(async task =>
{
try
{
task.RunSynchronously();
await task.Unwrap().ConfigureAwait(false);
}
catch { } // Suppress exceptions (errors are propagated through the task)
}, transformDetailOptions);
return new TransformBlock<TInput, TOutput>(async item =>
{
IEnumerable<TDetail> details = await split(item); //continue on captured context
TDetailResult[] detailResults = await Task.Run(async () =>
{
var tasks = new List<Task<TDetailResult>>();
foreach (var detail in details)
{
var taskFactory = new Task<Task<TDetailResult>>(
() => transformDetail(detail), cancellationToken);
var accepted = await detailTransformer.SendAsync(taskFactory,
cancellationToken).ConfigureAwait(false);
if (!accepted)
{
cancellationToken.ThrowIfCancellationRequested();
throw new InvalidOperationException("Unexpected detail rejection.");
}
var task = taskFactory.Unwrap();
// Assume that the detailTransformer will never fail, and so the task
// will eventually complete. Guarding against this unlikely scenario
// with Task.WhenAny(task, detailTransformer.Completion) seems overkill.
tasks.Add(task);
}
return await Task.WhenAll(tasks).ConfigureAwait(false);
}); // continue on captured context
return aggregate(item, detailResults);
}, splitAggregateOptions);
}
// Overload with synchronous lambdas
public static TransformBlock<TInput, TOutput>
CreateSplitterAggregatorBlock<TInput, TDetail, TDetailResult, TOutput>(
Func<TInput, IEnumerable<TDetail>> split,
Func<TDetail, TDetailResult> transformDetail,
Func<TInput, TDetailResult[], TOutput> aggregate,
ExecutionDataflowBlockOptions splitAggregateOptions = null,
ExecutionDataflowBlockOptions transformDetailOptions = null)
{
return CreateSplitterAggregatorBlock(
item => Task.FromResult(split(item)),
detail => Task.FromResult(transformDetail(detail)),
aggregate, splitAggregateOptions, transformDetailOptions);
}
Below is a usage example of this block. The input is strings containing comma-separated numbers. Each string is splitted, then each number is doubled, and finally the doubled numbers of each input string are summed.
var processor = CreateSplitterAggregatorBlock<string, int, int, int>(split: str =>
{
var parts = str.Split(',');
return parts.Select(part => Int32.Parse(part));
}, transformDetail: number =>
{
return number * 2;
}, aggregate: (str, numbersArray) =>
{
var sum = numbersArray.Sum();
Console.WriteLine($"[{str}] => {sum}");
return sum;
});
processor.Post("1, 2, 3");
processor.Post("4, 5");
processor.Post("6, 7, 8, 9");
processor.Complete();
processor.LinkTo(DataflowBlock.NullTarget<int>());
processor.Completion.Wait();
Output:
[1, 2, 3] => 12
[4, 5] => 18
[6, 7, 8, 9] => 60