Many to Many TPL Dataflow does not process all inputs
Asked Answered
G

1

2

I have a TPL Datalow pipeline with two sources and two targets linked in a many-to-many fashion. The target blocks appear to complete successfully, however, it usually drops one or more inputs. I've attached the simplest possible full repro I could come up with below. Any ideas?

Notes:

  1. The problem only occurs if the artificial delay is used while generating the input.
  2. Complete() is successfully called for both sources, but one of the source's Completion task hangs in the WaitingForActivation state, even though both Targets complete successfully.
  3. I can't find any documentation stating many-to-many dataflows aren't supported, and this question's answer implies it is - https://social.msdn.microsoft.com/Forums/en-US/19d831af-2d3f-4d95-9672-b28ae53e6fa0/completion-of-complex-graph-dataflowgraph-object?forum=tpldataflow
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class Program
{
    private const int NumbersPerSource = 10;
    private const int MaxDelayMilliseconds = 10;

    static async Task Main(string[] args)
    {
        int numbersProcessed = 0;

        var source1 = new BufferBlock<int>();
        var source2 = new BufferBlock<int>();

        var target1 = new ActionBlock<int>(i => Interlocked.Increment(ref numbersProcessed));
        var target2 = new ActionBlock<int>(i => Interlocked.Increment(ref numbersProcessed));

        var linkOptions = new DataflowLinkOptions() { PropagateCompletion = true };
        source1.LinkTo(target1, linkOptions);
        source1.LinkTo(target2, linkOptions);
        source2.LinkTo(target1, linkOptions);
        source2.LinkTo(target2, linkOptions);

        var task1 = Task.Run(() => Post(source1));
        var task2 = Task.Run(() => Post(source2));

        // source1 or source2 Completion tasks may never complete even though Complete is always successfully called.
        //await Task.WhenAll(task1, task2, source1.Completion, source2.Completion, target1.Completion, target2.Completion);
        await Task.WhenAll(task1, task2, target1.Completion, target2.Completion);

        Console.WriteLine($"{numbersProcessed} of {NumbersPerSource * 2} numbers processed.");
    }

    private static async Task Post(BufferBlock<int> source)
    {
        foreach (var i in Enumerable.Range(0, NumbersPerSource)) {
            await Task.Delay(TimeSpan.FromMilliseconds(GetRandomMilliseconds()));
            Debug.Assert(source.Post(i));
        }
        source.Complete();
    }

    private static Random Random = new Random();

    private static int GetRandomMilliseconds()
    {
        lock (Random) {
            return Random.Next(0, MaxDelayMilliseconds);
        }
    }
}
Grandsire answered 4/9, 2020 at 22:46 Comment(5)
the problem is that because you have linked to both sources, and are using PropagateCompletion, the first source to complete shuts down BOTH targets. So the second source ends up posting to a closed target block and you miss some numbers.Gynandry
Ah, that sounds like that's probably right. The only thing that still has me scratching my head though is that the call to source.Post returns true. I would think it would return false if no targets are accepting it. I'll have to dig into that a bit more to see what the expected behavior is. Thanks!Grandsire
Related: TPL Dataflow, guarantee completion only when ALL source data blocks completed, and also TPL Dataflow: process messages from two incoming blocks sequentially.Pubescent
the bigger question is probably what problem are you trying to solve by posting many to many like this?Gynandry
I have financial ticker dataflow that I split into multiple dataflows that each process different metrics (moving averages, etc.). Those are then collated back into one dataflow with all data points being sent to a trading strategy to generated trading signals. There are other data sources like news feeds and database data that also get collated/joined into the final output, but the ticker data is used by multiple calculations, so that is the main reason for the many-to-many dataflows.Grandsire
P
4

As @MikeJ pointed out in a comment, linking the blocks using the PropagateCompletion in a many-to-many dataflow configuration can cause the premature completion of some target blocks. In this case the target1 and target2 are both marked as completed when any of the two source blocks completes, leaving the other source unable to complete, because there are still messages in it's output buffer. These messages are never going to be consumed, because none of the linked target blocks is willing to accept them.

To fix this problem you could use the custom PropagateCompletion method below:

public static void PropagateCompletion(IDataflowBlock[] sources,
    IDataflowBlock[] targets)
{
    // Arguments validation omitted
    Task allSourcesCompletion = Task.WhenAll(sources.Select(s => s.Completion));
    ThreadPool.QueueUserWorkItem(async _ =>
    {
        try { await allSourcesCompletion.ConfigureAwait(false); } catch { }

        Exception exception = allSourcesCompletion.IsFaulted ?
            allSourcesCompletion.Exception : null;

        foreach (var target in targets)
        {
            if (exception is null) target.Complete(); else target.Fault(exception);
        }
    });
}

Usage example:

source1.LinkTo(target1);
source1.LinkTo(target2);
source2.LinkTo(target1);
source2.LinkTo(target2);
PropagateCompletion(new[] { source1, source2 }, new[] { target1, target2 });

Notice that no DataflowLinkOptions are passed when linking the sources to the targets in this example.

Pubescent answered 5/9, 2020 at 1:6 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.