BufferBlock and ActionBlock with BoundedCapacity does not use max DOP
Asked Answered
B

1

7

I have this code:

var data = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 1 });

var action = new ActionBlock<int>(async id =>
{
    Console.WriteLine("[{0:T}] #{1}: Start", DateTime.Now, id);

    await Task.Delay(1000);

    Console.WriteLine("[{0:T}] #{1}: End", DateTime.Now, id);
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 1,
    MaxDegreeOfParallelism = -1
});

data.LinkTo(action, new DataflowLinkOptions { PropagateCompletion = true });

for (var id = 1; id <= 3; id++)
{
    Console.WriteLine("[{0:T}] Sending {1}", DateTime.Now, id);
    data.SendAsync(id).Wait();
    Console.WriteLine("[{0:T}] Sending {1} complete", DateTime.Now, id);
}

data.Complete();

Task.WhenAll(data.Completion, action.Completion).Wait();

And this code gets me this output:

[22:31:22] Sending 1
[22:31:22] Sending 1 complete
[22:31:22] Sending 2
[22:31:22] #1: Start
[22:31:22] Sending 2 complete
[22:31:22] Sending 3
[22:31:23] #1: End
[22:31:23] #2: Start
[22:31:23] Sending 3 complete
[22:31:24] #2: End
[22:31:24] #3: Start
[22:31:25] #3: End

Why isn't ActionBlock working in parallel even though it has an unbounded DOP?

Bailiwick answered 26/10, 2014 at 19:34 Comment(0)
A
8

The reason your ActionBlock seems to be have a limited degree of parallelism is because it has a BoundedCapacity of 1. BoundedCapacity (unlike InputCount) includes the item being processed at the moment. This can be easily demonstrated:

var block = new ActionBlock<int>(_ => Task.Delay(-1), new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 1,
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});

await block.SendAsync(4); // Adds a new item
await block.SendAsync(4); // Blocks forever

That means that while you set MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded the block can't accept more than a single item at the time and so practically limiting your degree of parallelism.

You can fix that by setting a larger BoundedCapacity:

var action = new ActionBlock<int>(async id =>
{
    Console.WriteLine("[{0:T}] #{1}: Start", DateTime.Now, id);
    await Task.Delay(1000);
    Console.WriteLine("[{0:T}] #{1}: End", DateTime.Now, id);
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 10,
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
Anticipant answered 27/10, 2014 at 18:4 Comment(3)
Ok, but I've also set DOP to -1. And I thought that Dataflow should create as many copies of ActionBlock as it needs, meaning that even if one ActionBlock instance got one item and there is another item incomming - it could create another instance of ActionBlock to handle new item. And it goes like this. No?Bailiwick
@MichaelLogutov no block copies itself. Unless you create more there's only a single instance. Blocks use tasks for parallelism, but if you bound its capacity, it can't hold enough items to parallelize them.Anticipant
Thanks. Didn't know that. And documentation is really lacking that point.Bailiwick

© 2022 - 2024 — McMap. All rights reserved.