TPL Dataflow: Why does EnsureOrdered = false destroy parallelism for this TransformManyBlock?
Asked Answered
M

1

3

I'm working on a TPL Dataflow pipeline and noticed some strange behaviour related to ordering/parallelism in TransformManyBlocks (might apply to other blocks as well).

Here is my code to reproduce (.NET 4.7.2, TPL Dataflow 4.9.0):

class Program
{
    static void Main(string[] args)
    {
        var sourceBlock = new TransformManyBlock<int, Tuple<int, int>>(i => Source(i),
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = false });

        var targetBlock = new ActionBlock<Tuple<int, int>>(tpl =>
        {
            Console.WriteLine($"Received ({tpl.Item1}, {tpl.Item2})");
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = true });

        sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { PropagateCompletion = true });

        for (int i = 0; i < 10; i++)
        {
            sourceBlock.Post(i);
        }

        sourceBlock.Complete();
        targetBlock.Completion.Wait();
        Console.WriteLine("Finished");
        Console.Read();
    }

    static IEnumerable<Tuple<int, int>> Source(int i)
    {
        var rand = new Random(543543254);
        for (int j = 0; j < i; j++)
        {
            Thread.Sleep(rand.Next(100, 1500));
            Console.WriteLine($"Returning ({i}, {j})");
            yield return Tuple.Create(i, j);
        }
    }
}

My desired behaviour is the following:

  • The source block should return tuples in parallel, the only requirement is that they should be ordered by the secondary property j.
  • The target block should process messages in the order received.

From what I understand, the secondary ordering condition is satisfied by the nature of yield return, so EnsureOrdered can be set to false. If this is set to true, the source block will withhold messages for an unacceptable amount of time since it will wait for all yield return to complete before passing the message along (in the real app many GB of data is processed which means that we want to propagate data through the pipeline as quickly as possible so we can release RAM). This is a sample output when EnsureOrdered of the source block is set to true:

Returning (1, 0)
Returning (2, 0)
Returning (4, 0)
Returning (3, 0)
Returning (2, 1)
Returning (4, 1)
Returning (3, 1)
Received (1, 0)
Received (2, 0)
Received (2, 1)
Returning (4, 2)
Returning (3, 2)
Received (3, 0)
Received (3, 1)
Received (3, 2)
Returning (5, 0)
Returning (6, 0)

We can see that the source block works in parallel, but waits to propagate messages until all the messages for the next i in line has been generated (as expected).

However when EnsureOrdered for the source block is false (as in the code sample), I get the following output:

Returning (2, 0)
Received (2, 0)
Returning (2, 1)
Received (2, 1)
Returning (4, 0)
Received (4, 0)
Returning (4, 1)
Received (4, 1)
Returning (4, 2)
Received (4, 2)
Returning (4, 3)
Received (4, 3)
Returning (1, 0)
Received (1, 0)
Returning (3, 0)
Received (3, 0)
Returning (3, 1)
Received (3, 1)
Returning (3, 2)
Received (3, 2)

The source block successfully propagates messages when available, however it seems like parallelism is lost since it is just working with one i at a time.

Why is this? How can I force it to process in parallel?

Mong answered 11/7, 2018 at 2:36 Comment(0)
C
3

A fix for this is in progress here: https://github.com/dotnet/corefx/pull/31059

Thank you for your report!

Creight answered 13/7, 2018 at 21:36 Comment(1)
That's great when the problem is solved so fast, but that's would be even better if you descibed here what happened and why :)Willtrude

© 2022 - 2024 — McMap. All rights reserved.