I have producer / consumer pattern in my app implemented used TPL Dataflow. I have the big dataflow mesh with about 40 blocks in it. There are two main functional parts in the mesh: producer part and consumer part. Producer supposed to continuosly provide a lot of work for consumer while consumer handling incoming work slowly sometimes. I want to pause producer when consumer is busy with some specified amount of work items. Otherwise the app consumes a lot of memory / CPU and behaves unsustainable.
I made demo app that demonstrates the issue:
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
static void Main(string[] args)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
EnsureOrdered = false
};
var boundedOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
EnsureOrdered = false,
BoundedCapacity = 5
};
var bufferBlock = new BufferBlock<int>(boundedOptions);
var producerBlock = new TransformBlock<int, int>(x => x + 1, options);
var broadcastBlock = new BroadcastBlock<int>(x => x, options);
var consumerBlock = new ActionBlock<int>(async x =>
{
var delay = 1000;
if (x > 10) delay = 5000;
await Task.Delay(delay);
Console.WriteLine(x);
}, boundedOptions);
producerBlock.LinkTo(bufferBlock);
bufferBlock.LinkTo(broadcastBlock);
broadcastBlock.LinkTo(producerBlock);
broadcastBlock.LinkTo(consumerBlock);
bufferBlock.Post(1);
consumerBlock.Completion.Wait();
}
}
}
The app prints something like this:
2
1
3
4
5
69055
69053
69054
69057
438028
438040
142303
438079
That means the producer keeps spinning and pushing messages to consumer. I want it to pause and wait until the consumer have finished current portion of work and then the producer should continue providing messages for consumer.
My question is quote similar to other question but it wasn't answered properly. I tried that solution and it doesn't work here allowing producer to flood the consumer with messages. Also setting BoundedCapacity
doesn't work too.
The only solution I guess so far is make my own block that will monitor target block queue and acts according to target block's queue. But I hope it is kind of overkill for this issue.
Rx
instead? Take a look at this answer: https://mcmap.net/q/345147/-tpl-vs-reactive-framework – RansellBoundedCapacity
option. In the specific case that the producer is aTransformManyBlock
block, then theBoundedCapacity
has no effect to the output queue of this block, but there are solutions here. – Kortneykoruna