How can I specify an unordered Execution Block using the TPL Dataflow Library?
Asked Answered
M

2

6

I want to set up a TransformBlock that processes its item in parallel. Thus, I'm setting ExecutionDataflowBlockOptions.MaxDegreeOfParallelism to > 1. I don't care about the order of the messages but the documentation says:

When you specify a maximum degree of parallelism that is larger than 1, multiple messages are processed simultaneously, and therefore, messages might not be processed in the order in which they are received. The order in which the messages are output from the block will, however, be correctly ordered.

Does "correctly ordered" mean that if there is one message in the queue that needs long processing time, further messages are not output until this one message is processed?

And if so, how can I specify an Execution Block (for example a TransformBlock) that does not care about the ordering? Or do I have to specify at the consumption end that I don't care about ordering?

Madea answered 6/4, 2014 at 12:16 Comment(1)
Related: Understanding TPL Dataflow Degree of Parallelism orderingBenton
A
5

There is no such block in the library, but you can easily create one yourself by combining an ActionBlock and a BufferBlock. Something like:

public static IPropagatorBlock<TInput, TOutput>
    CreateUnorderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> func, ExecutionDataflowBlockOptions options)
{
    var buffer = new BufferBlock<TOutput>(options);
    var action = new ActionBlock<TInput>(
        async input =>
        {
            var output = func(input);
            await buffer.SendAsync(output);
        }, options);

    action.Completion.ContinueWith(
        t =>
        {
            IDataflowBlock castedBuffer = buffer;

            if (t.IsFaulted)
            {
                castedBuffer.Fault(t.Exception);
            }
            else if (t.IsCanceled)
            {
                // do nothing: both blocks share options,
                // which means they also share CancellationToken
            }
            else
            {
                castedBuffer.Complete();
            }
        });

    return DataflowBlock.Encapsulate(action, buffer);
}

This way, once an item is processed by the ActionBlock, it's immediately moved to the BufferBlock, which means ordering is not maintained.

One issue with this code is that it doesn't observe the set BoundedCapacity well: in effect, the capacity of this block is twice the capacity set in options (because each of the two blocks has a separate capacity).

Achromatism answered 6/4, 2014 at 12:42 Comment(2)
Thanks! This should be used an as example on msdn.microsoft.com/en-us/library/hh228606(v=vs.110).aspx, as it makes it clear that Encapsulate is useful to change the way a dataflow works.Ardolino
EnsureOrdered was added to DataflowBlockOptions in 2016 with TransformBlock and TransformManyBlock implementing it. For an unordered TransformBlock set EnsureOrdered to false in the DataflowBlockOptions github.com/dotnet/corefx/pull/5191Discant
B
0

(Upgrading a comment by NPNelson to an answer)

The DataflowBlockOptions class contains a configurable property EnsureOrdered (introduced in 2016), that determines if the order of the received messages will be preserved at the block's output. This property is true by default. Setting this property to false makes the block to propagate messages as soon as they are processed, increasing this way the throughput of the pipeline because of the faster propagation and the reduced overhead.

Benton answered 6/4, 2014 at 12:17 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.