TPL Dataflow async scheduling
Asked Answered
N

2

5

The scheduling of async Tasks does not work as I expected in TPL Dataflow. In the example below, I expected the ActionBlock to process data from the TransformBlock as soon as it is available. But it is waiting on the second (delayed) result before it proceeds to the third. What have I misunderstood here? Is there some requirement on the order of processing?

public class TestDataFlow
{
    public System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch();

    public async Task Flow()
    {
        watch.Start();

        var plus10 = new TransformBlock<int, int>(async input =>
        {
            if (input == 2)
            {
                await Task.Delay(5000);
            }
            Console.WriteLine("Exiting plus10 for input {0} @ {1}", input, watch.Elapsed);
            return input + 10;
        },
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 4,
        });

        var printSolution = new ActionBlock<int>(input =>
        {
            Console.WriteLine("Solution: {0} @ {1}", input, watch.Elapsed.TotalMilliSeconds);
        },
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 4,
        });

        plus10.LinkTo(printSolution);

        List<int> inputs = new List<int> { 1, 2, 3 };
        foreach (var input in inputs)
        {
            await plus10.SendAsync(input);
        }
    }
}

Output:

Exiting plus10 for input 1 @ 115.8583
Exiting plus10 for input 3 @ 116.6973
Solution: 11 @ 126.0146
Exiting plus10 for input 2 @ 5124.4074
Solution: 12 @ 5124.9014
Solution: 13 @ 5126.4834
Numeral answered 27/11, 2014 at 12:32 Comment(1)
Take a look at this my answer about making the mesh ignore keeping orderHsinking
F
5

TPL Dataflow guarantees order of the input and output queues no matter how many items are processed in parallel.

"Because each predefined source dataflow block type guarantees that messages are propagated out in the order in which they are received, every message must be read from the source block before the source block can process the next message"

From Dataflow (Task Parallel Library)

If you want items to move on to the next block exactly when they are finished processing you should transfer them explicitly yourself which turns your TransformBlock to an ActionBlock:

var printSolution = new ActionBlock<int>(input =>
{
    Console.WriteLine("Solution: {0} @ {1}", input, watch.Elapsed.TotalMilliSeconds);
},executionDataflowBlockOptions);

var plus10 = new ActionBlock<int>(async input =>
{
    if (input == 2)
    {
        await Task.Delay(5000);
    }
    Console.WriteLine("Exiting plus10 for input {0} @ {1}", input, watch.Elapsed);
    await printSolution.SendAsync(input + 10);
}, executionDataflowBlockOptions);
Fiberglass answered 27/11, 2014 at 12:39 Comment(0)
T
3

As of (at least) System.Threading.Tasks.Dataflow.4.6.0, ExecutionDataflowBlockOptions now has a property EnsureOrdered which may be set to false.

To update:

Install-Package System.Threading.Tasks.Dataflow

Code:

var options = new ExecutionDataflowBlockOptions {
  EnsureOrdered = false
};
var transform = new TransformBlock<int, int>(i => Transform(i), options);

Some more examples: https://mcmap.net/q/1963007/-why-do-blocks-run-in-this-order

Development history, which I thought was neat: https://github.com/dotnet/corefx/issues/536 https://github.com/dotnet/corefx/pull/5191

Tectrix answered 10/9, 2016 at 9:38 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.