Is there any way to limit performance degradation with TPL Dataflow throttling?
I have a complicated pipeline of components and trying to limit a memory requirements needed. I read in parallel from multiple files, components in a pipeline might do some addition reading from random part of those files, the rest of components do CPU bound operations.
I simplified a performance test-bench into these tests using a common test method.
private void TPLPerformaceTest(int generateNumbers,
ExecutionDataflowBlockOptions transformBlockOptions)
{
var transformBlock = new TransformBlock<int, int>(i => i, transformBlockOptions);
var storedCount = 0;
var generatedCount = 0;
var store = new ActionBlock<int>(i => Interlocked.Increment(ref storedCount));
transformBlock.LinkTo(store);
transformBlock.Completion.ContinueWith(_ => store.Complete());
for (int i = 0; i < generateNumbers; i++)
{
transformBlock.SendAsync(i).Wait(); //To ensure delivery
Interlocked.Increment(ref generatedCount);
}
transformBlock.Complete();
store.Completion.Wait();
Assert.IsTrue(generatedCount == generateNumbers);
Assert.IsTrue(storedCount == generateNumbers);
}
The first that has no throttling. On my CPU it takes about 12s to complete, consumes about 800MB of RAM and average CPU utilization is about 35%.
[Test]
public void TPLPerformaceUnlimitedTest()
{
var generateNumbers = 100_000_000;
this.TPLPerformaceTest(generateNumbers,new ExecutionDataflowBlockOptions());
}
Second test that just sets BoundedCapacity to int.MaxValue, thus does no limitation at all, takes 20-30s to complete, consumes 2.1GB of RAM and average CPU utilization is about 50%. According to manual, BoundedCapacity should be set to int.MaxValue by default, so I do not see the reason for performance degradation.
[Test]
[Sequential]
public void TPLPerformaceBounedCapacityTest()
{
var generateNumbers = 100_000_000;
this.TPLPerformaceTest(generateNumbers,new ExecutionDataflowBlockOptions()
{ BoundedCapacity = Int32.MaxValue });
}
The third test limit BoundedCapacity to generateNumbers / 1000, ergo 100,000. It takes 60s to complete and consumes 450MB of RAM and average CPU utilization is about 60%.
[Test]
[Sequential]
public void TPLPerformaceBounedCapacityTenthTest()
{
var generateNumbers = 100_000_000;
this.TPLPerformaceTest(generateNumbers,new ExecutionDataflowBlockOptions()
{ BoundedCapacity = generateNumbers / 1000 });
}
The fourth test limits MaxDegreeOfParallelism to -1 which is according to the manual no limit. It consumed 27GB of RAM and average CPU utilization was about 85% and has not finished in 5 minutes.
[Test]
[Sequential]
public void TPLPerformaceMaxDegreeOfParallelismTest()
{
var generateNumbers = 100_000_000;
this.TPLPerformaceTest(generateNumbers, new ExecutionDataflowBlockOptions()
{ MaxDegreeOfParallelism = -1 });
}
All methods seems to affect performance really hard and do not behave due to my reasonable expectations.