Combining dataflow results
Asked Answered
S

2

3

I am develop a Dataflow pipeline which reads a collection of files and, for each line in each file, performs a series of Dataflow blocks.

After all steps have completed for each line in a file, I am wanting to execute further blocks on the file itself, but I don't know how this is possible.

It is straightforward to split processing via a TransformManyBlock, but how can one then consolidate?
I am used to Apache Camel's Splitter and Aggregator functionality - or there a fundamental disconnect between Dataflow's intent and my desired usage?

Silvio answered 9/8, 2017 at 4:30 Comment(1)
Related: Dataflow with splitting work to small jobs and then group againDiaeresis
D
1

You probably should look into JoinBlock and BatchedJoinBlock. Both of them are able to join two or three sources, and you can setup a filter for them to gather some items specifically.

Some useful links for you:

Dashboard answered 9/8, 2017 at 15:54 Comment(3)
I may be misunderstanding the paradigm, but I'm not sure how a JoinBlock will work in my use case as I have n inputs to wait for. Each file may have, say, 1,000 lines to process. Each line has a sequence of Blocks for processing the contents of the line. I am wanting to continue processing the file once all blocks for all lines in the file has completed. Conceptually, these are n sequence of sub-flows per file. A JoinBlock looks suitable for processing a fixed set of inputs.Silvio
From your question this wasn't obvious. This situation can be solved in different ways. For example, by filtering the blocks or by imcreasing the MaxDegreeOfParallelism. Without your code it's hard to help you.Dashboard
Did you find a solution to your problem? I have a very similar issue and would love to hear how you implemented it. In my opinion, you'd have to implement your own Block. Thanks for your responseGerge
D
1

A proper implementation of a Splitter and an Aggregator block would be way too complex to implement, and too cumbersome to use. So I came up with a simpler API, that encapsulates two blocks, a master block and a detail block. The processing options for each block are different. The master block executes the splitting and the aggregating actions, while the detail block executes the transformation of each detail. The only requirement regarding the two separate sets of options is that the CancellationToken must be the same for both. All other options (MaxDegreeOfParallelism, BoundedCapacity, EnsureOrdered, TaskScheduler etc) can be set independently for each block.

public static TransformBlock<TInput, TOutput>
    CreateSplitterAggregatorBlock<TInput, TDetail, TDetailResult, TOutput>(
    Func<TInput, Task<IEnumerable<TDetail>>> split,
    Func<TDetail, Task<TDetailResult>> transformDetail,
    Func<TInput, TDetailResult[], TOutput> aggregate,
    ExecutionDataflowBlockOptions splitAggregateOptions = null,
    ExecutionDataflowBlockOptions transformDetailOptions = null)
{
    if (split == null) throw new ArgumentNullException(nameof(split));
    if (aggregate == null) throw new ArgumentNullException(nameof(aggregate));
    if (transformDetail == null)
        throw new ArgumentNullException(nameof(transformDetail));
    splitAggregateOptions = splitAggregateOptions ??
        new ExecutionDataflowBlockOptions();
    var cancellationToken = splitAggregateOptions.CancellationToken;
    transformDetailOptions = transformDetailOptions ??
        new ExecutionDataflowBlockOptions() { CancellationToken = cancellationToken };
    if (transformDetailOptions.CancellationToken != cancellationToken)
        throw new ArgumentException("Incompatible options", "CancellationToken");

    var detailTransformer = new ActionBlock<Task<Task<TDetailResult>>>(async task =>
    {
        try
        {
            task.RunSynchronously();
            await task.Unwrap().ConfigureAwait(false);
        }
        catch { } // Suppress exceptions (errors are propagated through the task)
    }, transformDetailOptions);

    return new TransformBlock<TInput, TOutput>(async item =>
    {
        IEnumerable<TDetail> details = await split(item); //continue on captured context
        TDetailResult[] detailResults = await Task.Run(async () =>
        {
            var tasks = new List<Task<TDetailResult>>();
            foreach (var detail in details)
            {
                var taskFactory = new Task<Task<TDetailResult>>(
                    () => transformDetail(detail), cancellationToken);
                var accepted = await detailTransformer.SendAsync(taskFactory,
                    cancellationToken).ConfigureAwait(false);
                if (!accepted)
                {
                    cancellationToken.ThrowIfCancellationRequested();
                    throw new InvalidOperationException("Unexpected detail rejection.");
                }
                var task = taskFactory.Unwrap();
                // Assume that the detailTransformer will never fail, and so the task
                // will eventually complete. Guarding against this unlikely scenario
                // with Task.WhenAny(task, detailTransformer.Completion) seems overkill.
                tasks.Add(task);
            }
            return await Task.WhenAll(tasks).ConfigureAwait(false);
        }); // continue on captured context
        return aggregate(item, detailResults);
    }, splitAggregateOptions);
}

// Overload with synchronous lambdas
public static TransformBlock<TInput, TOutput>
    CreateSplitterAggregatorBlock<TInput, TDetail, TDetailResult, TOutput>(
    Func<TInput, IEnumerable<TDetail>> split,
    Func<TDetail, TDetailResult> transformDetail,
    Func<TInput, TDetailResult[], TOutput> aggregate,
    ExecutionDataflowBlockOptions splitAggregateOptions = null,
    ExecutionDataflowBlockOptions transformDetailOptions = null)
{
    return CreateSplitterAggregatorBlock(
        item => Task.FromResult(split(item)),
        detail => Task.FromResult(transformDetail(detail)),
        aggregate, splitAggregateOptions, transformDetailOptions);
}

Below is a usage example of this block. The input is strings containing comma-separated numbers. Each string is splitted, then each number is doubled, and finally the doubled numbers of each input string are summed.

var processor = CreateSplitterAggregatorBlock<string, int, int, int>(split: str =>
{
    var parts = str.Split(',');
    return parts.Select(part => Int32.Parse(part));
}, transformDetail: number =>
{
    return number * 2;
}, aggregate: (str, numbersArray) =>
{
    var sum = numbersArray.Sum();
    Console.WriteLine($"[{str}] => {sum}");
    return sum;
});

processor.Post("1, 2, 3");
processor.Post("4, 5");
processor.Post("6, 7, 8, 9");
processor.Complete();
processor.LinkTo(DataflowBlock.NullTarget<int>());
processor.Completion.Wait();

Output:

[1, 2, 3] => 12
[4, 5] => 18
[6, 7, 8, 9] => 60
Diaeresis answered 18/6, 2020 at 21:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.