From the documentation of the Post
method on the DataflowBlock<T>
class (emphasis mine):
This method will return once the target block has decided to accept or decline the item,
This means that the target can choose to decline the block (which is the behavior that you're seeing).
Further on, it states:
For target blocks that support postponing offered messages, or for blocks that may do more processing in their Post implementation, consider using SendAsync, which will return immediately and will enable the target to postpone the posted message and later consume it after SendAsync returns.
This means that you may have better results (depending on the target block) in that your message may be postponed, but still processed, as opposed to outright rejected.
I imagine that the BoundedCapacity
property settings on both the BufferBlock<T>
and the three ActionBlock<TInput>
instances have something to do with what you're seeing:
Your maximum buffer on the BufferBlock<T>
is 10000; once you put 10,000 items into the queue, it will reject the rest (see second quote above), as it can't process them (SendAsync
won't work here either, as it can't buffer the message to be postponed).
Your maximum buffer on the ActionBlock<TInput>
instances is 1, and you have three of them.
10,000 + (1 * 3) = 10,000 + 3 = 10,003
To get around this, you need to do a few things.
First, you need to set a more reasonable value for MaxDegreeOfParallelism
property the ExecutionDataFlowBlockOptions
when creating the ActionBlock<TInput>
instances.
By default, the MaxDegreeOfParallelism
for an ActionBlock<TInput>
is set to 1; this guarantees that calls will be serialized and you don't have to worry about thread-safety. If you want the ActionBlock<T>
to be concerned about thread-safety, then keep this setting.
If the ActionBlock<TInput>
is thread-safe, then you have no reason to throttle it, and you should set MaxDegreeOfParallelism
to DataflowBlockOptions.Unbounded
.
Chances are if you're accessing some sort of shared resource in the ActionBlock<TInput>
that can be accessed concurrently on a limited basis, then you're probably doing the wrong thing.
If you have some sort of shared resource, then chances are you should run it through another block and set the MaxDegreeOfParallelism
on that.
Second, if you are concerned with throughput and are ok with dropped items, then you should set the BoundedCapacity
property.
Also note, you indicate "if consumers are slow, sleep for a while"; there's no reason to do this if you wire up your blocks correctly, you should just let the data flow through and place the restrictions only where you need them. Your producer shouldn't be responsible for throttling the consumer, let the consumer be responsible for the throttling.
Lastly, your code doesn't look like you need to implement the dataflow block interfaces yourself. You could construct it like so:
// The source, your read lines will be posted here.
var delimitedFileBlock = new BufferBlock<string>();
// The Action for the action blocks.
Action<string> action =
s => { /* Do something with the string here. */ };
// Create the action blocks, assuming that
// action is thread-safe, no need to have it process one at a time
// or to bound the capacity.
var solaceActionBlock1 = new ActionBlock<string>(action,
new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
});
var solaceActionBlock2 = new ActionBlock<string>(action,
new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
});
var solaceActionBlock3 = new ActionBlock<string>(action,
new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
});
// Link everything.
delimitedFileBlock.LinkTo(solaceTargetBlock1);
delimitedFileBlock.LinkTo(solaceTargetBlock2);
delimitedFileBlock.LinkTo(solaceTargetBlock3);
// Now read the file, and post to the BufferBlock<T>:
// Note: This is pseudo-code.
while (!eof)
{
// Read the row.
string row = ...;
delimitedFileBlock.Post(read);
}
Also note that having three ActionBlock<TInput>
instances is uncessary, unless you need to filter the output to different actions (which you aren't doing here), so the above really reduces to this (assuming your action is thread-safe, so you're going to increase MaxDegreeOfParallelism
to Unbounded
anyways):
// The source, your read lines will be posted here.
var delimitedFileBlock = new BufferBlock<string>();
// The Action for the action blocks.
Action<string> action =
s => { /* Do something with the string here. */ };
// Create the action blocks, assuming that
// action is thread-safe, no need to have it process one at a time
// or to bound the capacity.
var solaceActionBlock1 = new ActionBlock<string>(action,
new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
});
// Link everything.
delimitedFileBlock.LinkTo(solaceTargetBlock);
// Now read the file, and post to the BufferBlock<T>:
// Note: This is pseudo-code.
while (!eof)
{
// Read the row.
string row = ...;
delimitedFileBlock.Post(read);
}