TPL Dataflow, guarantee completion only when ALL source data blocks completed
Asked Answered
S

5

35

How can I re-write the code that the code completes when BOTH transformblocks completed? I thought completion means that it is marked complete AND the " out queue" is empty?

public Test()
    {
        broadCastBlock = new BroadcastBlock<int>(i =>
            {
                return i;
            });

        transformBlock1 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("1 input count: " + transformBlock1.InputCount);
                Thread.Sleep(50);
                return ("1_" + i);
            });

        transformBlock2 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("2 input count: " + transformBlock1.InputCount);
                Thread.Sleep(20);
                return ("2_" + i);
            });

        processorBlock = new ActionBlock<string>(i =>
            {
                Console.WriteLine(i);
            });

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        const int numElements = 100;

        for (int i = 1; i <= numElements; i++)
        {
            broadCastBlock.SendAsync(i);
        }

        //mark completion
        broadCastBlock.Complete();

        processorBlock.Completion.Wait();

        Console.WriteLine("Finished");
        Console.ReadLine();
    }
}

I edited the code, adding an input buffer count for each transform block. Clearly all 100 items are streamed to each of the transform blocks. But as soon as one of the transformblocks finishes the processorblock does not accept any more items and instead the input buffer of the incomplete transformblock just flushes the input buffer.

Shulamite answered 22/11, 2012 at 10:0 Comment(2)
Beware that the transform blocks might not receive all messages from the broadcast block. They only receive the latest message. If the broadcast block is offered messages faster than the transform block can receive them, the transform block will miss messages. Also, you should await on SendAsync(i) if you want to ensure message order etc.Pricilla
@Pricilla I'm not sure about your assertion that the TransformBlock will miss messages. The docs state that BroadcastBlock guarantees that it will propagate to all linked targets before accepting a new item. If BoundedCapacity is unbounded on the target, then the target TransformBlock will buffer messages. Now, if BoundedCapacity is set, then it will block until there's capacity, and that might result in dropped messages, especially with the unawaited SendAsync. In short, my understanding is that if unbounded, no messages are dropped.Hallock
P
38

The issue is exactly what casperOne said in his answer. Once the first transform block completes, the processor block goes into “finishing mode”: it will process remaining items in its input queue, but it won't accept any new items.

There is a simpler fix than splitting your processor block in two though: don't set PropagateCompletion, but instead set completion of the processor block manually when both transform blocks complete:

Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion)
    .ContinueWith(_ => processorBlock.Complete());
Puckett answered 23/11, 2012 at 21:52 Comment(5)
exactly what I was looking for. Was not aware that Task.WhenAll returns an awaitable task, my negligence.Shulamite
I need the very same, maybe its too late, but could you post an update about where do I need to add the Task.WhenAll construct?Indore
@AttilaHajdrik Probably at the end of your dataflow setup code, near your LinkTos.Puckett
I've tried that but was not working, but it was 1:30AM...although I've modified my code to plain TPL and separated some tasks and I think I was able to solve it well without DataFlow.Indore
This solution won't propagate the faulted state through the network as well.Mechanotherapy
W
33

The issue here is that you are setting the PropagateCompletion property each time you call the LinkTo method to link the blocks and the different in wait times in your transformation blocks.

From the documentation for the Complete method on the IDataflowBlock interface (emphasis mine):

Signals to the IDataflowBlock that it should not accept nor produce any more messages nor consume any more postponed messages.

Because you stagger out your wait times in each of the TransformBlock<TInput, TOutput> instances, transformBlock2 (waiting for 20 ms) is finished before transformBlock1 (waiting for 50 ms). transformBlock2 completes first, and then sends the signal to processorBlock which then says "I'm not accepting anything else" (and transformBlock1 hasn't produced all of its messages yet).

Note that the processing of transformBlock1 before transformBlock1 is not absolutely guaranteed; it's feasible that the thread pool (assuming you're using the default scheduler) will process the tasks in a different order (but more than likely will not, as it will steal work from the queues once the 20 ms items are done).

Your pipeline looks like this:

           broadcastBlock
          /              \
 transformBlock1   transformBlock2
          \              /
           processorBlock

In order to get around this, you want to have a pipeline that looks like this:

           broadcastBlock
          /              \
 transformBlock1   transformBlock2
          |              |
 processorBlock1   processorBlock2

Which is accomplished by just creating two separate ActionBlock<TInput> instances, like so:

// The action, can be a method, makes it easier to share.
Action<string> a = i => Console.WriteLine(i);

// Create the processor blocks.
processorBlock1 = new ActionBlock<string>(a);
processorBlock2 = new ActionBlock<string>(a);


// Linking
broadCastBlock.LinkTo(transformBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true });

You then need to wait on both processor blocks instead of just one:

Task.WhenAll(processorBlock1.Completion, processorBlock2.Completion).Wait();

A very important note here; when creating an ActionBlock<TInput>, the default is to have the MaxDegreeOfParallelism property on the ExecutionDataflowBlockOptions instance passed to it set to one.

This means that the calls to the Action<T> delegate that you pass to the ActionBlock<TInput> are thread-safe, only one will execute at a time.

Because you now have two ActionBlock<TInput> instances pointing to the same Action<T> delegate, you aren't guaranteed thread-safety.

If your method is thread-safe, then you don't have to do anything (which would allow you to set the MaxDegreeOfParallelism property to DataflowBlockOptions.Unbounded, since there's no reason to block).

If it's not thread-safe, and you need to guarantee it, you need to resort to traditional synchronization primitives, like the lock statement.

In this case, you'd do it like so (although it's clearly not needed, as the WriteLine method on the Console class is thread-safe):

// The lock.
var l = new object();

// The action, can be a method, makes it easier to share.
Action<string> a = i => {
    // Ensure one call at a time.
    lock (l) Console.WriteLine(i);
};

// And so on...
Wellchosen answered 23/11, 2012 at 17:7 Comment(2)
Thanks for the lengthy answer, but I chose svick's answer because it directly applies to TPL Dataflow and offers a very concise and simple solution.Shulamite
You can easily avoid locking if you use the same ExclusiveScheduler for both action blocks.Puckett
B
9

An addition to svick's answer: to be consistent with the behaviour you get with the PropagateCompletion option, you also need to forward exceptions in case a preceding block faulted. An extension method like the following takes care of that as well:

public static void CompleteWhenAll(this IDataflowBlock target, params IDataflowBlock[] sources) {
    if (target == null) return;
    if (sources.Length == 0) { target.Complete(); return; }
    Task.Factory.ContinueWhenAll(
        sources.Select(b => b.Completion).ToArray(),
        tasks => {
            var exceptions = (from t in tasks where t.IsFaulted select t.Exception).ToList();
            if (exceptions.Count != 0) {
                target.Fault(new AggregateException(exceptions));
            } else {
                target.Complete();
            }
        }
    );
}
Badr answered 9/4, 2013 at 9:49 Comment(0)
G
2

Here is a method that is functionally equivalent to pkt's CompleteWhenAll method, but with slightly less code:

public static void PropagateCompletion(IDataflowBlock[] sources,
    IDataflowBlock target)
{
    // Arguments validation omitted
    Task allSourcesCompletion = Task.WhenAll(sources.Select(s => s.Completion));
    ThreadPool.QueueUserWorkItem(async _ =>
    {
        try { await allSourcesCompletion.ConfigureAwait(false); } catch { }

        Exception exception = allSourcesCompletion.IsFaulted ?
            allSourcesCompletion.Exception : null;

        if (exception is null) target.Complete(); else target.Fault(exception);
    });
}

Usage example:

PropagateCompletion(new[] { transformBlock1, transformBlock2 }, processorBlock);

The PropagateCompletion method is a variant of a more general method with the same name, that I have posted here.

Gaddy answered 6/5, 2020 at 15:30 Comment(0)
E
1

Other answers are quite clear about why PropagateCompletion=true mess things up when a block has more than two sources.

To provide a simple solution to the problem, you may want to look at an open source library DataflowEx that solves this kind of problem with smarter completion rules built-in. (It uses TPL Dataflow linking internally but supports complex completion propagation. The implementation looks similiar to WhenAll but also handles the dynamic link adding. Please check Dataflow.RegisterDependency() and TaskEx.AwaitableWhenAll() for impl detail.)

I slightly changed your code to make everything work using DataflowEx:

public CompletionDemo1()
{
    broadCaster = new BroadcastBlock<int>(
        i =>
            {
                return i;
            }).ToDataflow();

    transformBlock1 = new TransformBlock<int, string>(
        i =>
            {
                Console.WriteLine("1 input count: " + transformBlock1.InputCount);
                Thread.Sleep(50);
                return ("1_" + i);
            });

    transformBlock2 = new TransformBlock<int, string>(
        i =>
            {
                Console.WriteLine("2 input count: " + transformBlock2.InputCount);
                Thread.Sleep(20);
                return ("2_" + i);
            });

    processor = new ActionBlock<string>(
        i =>
            {
                Console.WriteLine(i);
            }).ToDataflow();

    /** rather than TPL linking
      broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
      broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
      transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
      transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
     **/

    //Use DataflowEx linking
    var transform1 = transformBlock1.ToDataflow();
    var transform2 = transformBlock2.ToDataflow();

    broadCaster.LinkTo(transform1);
    broadCaster.LinkTo(transform2);
    transform1.LinkTo(processor);
    transform2.LinkTo(processor);
}

Full code is here.

Disclaimer: I am the author of DataflowEx, which is published under MIT license.

Ephor answered 19/12, 2014 at 9:1 Comment(3)
could you please disclose in case you work for Gridsum? My question explicitly mentioned that I need an answer for TPL Dataflow, I did not want to use a third party solution for this problem. Thanks.Shulamite
Yes, I work for Gridsum. But the library is completely free and open source so I thought it might help you. No commercial thinking at all. If what you need is about the internal mechanism of TPL Dataflow please do ignore my answer. But if somebody needs a solution the answer has its value. Thanks :)Ephor
Updated the answer with a bit more detail. Disclaimer also added.Ephor

© 2022 - 2024 — McMap. All rights reserved.