I'm working on a TPL Dataflow pipeline and noticed some strange behaviour related to ordering/parallelism in TransformManyBlock
s (might apply to other blocks as well).
Here is my code to reproduce (.NET 4.7.2, TPL Dataflow 4.9.0):
class Program
{
static void Main(string[] args)
{
var sourceBlock = new TransformManyBlock<int, Tuple<int, int>>(i => Source(i),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = false });
var targetBlock = new ActionBlock<Tuple<int, int>>(tpl =>
{
Console.WriteLine($"Received ({tpl.Item1}, {tpl.Item2})");
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = true });
sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { PropagateCompletion = true });
for (int i = 0; i < 10; i++)
{
sourceBlock.Post(i);
}
sourceBlock.Complete();
targetBlock.Completion.Wait();
Console.WriteLine("Finished");
Console.Read();
}
static IEnumerable<Tuple<int, int>> Source(int i)
{
var rand = new Random(543543254);
for (int j = 0; j < i; j++)
{
Thread.Sleep(rand.Next(100, 1500));
Console.WriteLine($"Returning ({i}, {j})");
yield return Tuple.Create(i, j);
}
}
}
My desired behaviour is the following:
- The source block should return tuples in parallel, the only requirement is that they should be ordered by the secondary property
j
. - The target block should process messages in the order received.
From what I understand, the secondary ordering condition is satisfied by the nature of yield return
, so EnsureOrdered
can be set to false
. If this is set to true
, the source block will withhold messages for an unacceptable amount of time since it will wait for all yield return
to complete before passing the message along (in the real app many GB of data is processed which means that we want to propagate data through the pipeline as quickly as possible so we can release RAM). This is a sample output when EnsureOrdered
of the source block is set to true
:
Returning (1, 0)
Returning (2, 0)
Returning (4, 0)
Returning (3, 0)
Returning (2, 1)
Returning (4, 1)
Returning (3, 1)
Received (1, 0)
Received (2, 0)
Received (2, 1)
Returning (4, 2)
Returning (3, 2)
Received (3, 0)
Received (3, 1)
Received (3, 2)
Returning (5, 0)
Returning (6, 0)
We can see that the source block works in parallel, but waits to propagate messages until all the messages for the next i
in line has been generated (as expected).
However when EnsureOrdered
for the source block is false
(as in the code sample), I get the following output:
Returning (2, 0)
Received (2, 0)
Returning (2, 1)
Received (2, 1)
Returning (4, 0)
Received (4, 0)
Returning (4, 1)
Received (4, 1)
Returning (4, 2)
Received (4, 2)
Returning (4, 3)
Received (4, 3)
Returning (1, 0)
Received (1, 0)
Returning (3, 0)
Received (3, 0)
Returning (3, 1)
Received (3, 1)
Returning (3, 2)
Received (3, 2)
The source block successfully propagates messages when available, however it seems like parallelism is lost since it is just working with one i
at a time.
Why is this? How can I force it to process in parallel?