TPL Dataflows LinkTo multiple consumers not working
Asked Answered
F

1

6

I have a BufferBlock to which I post messages:

public class DelimitedFileBlock : ISourceBlock<string>
{
    private ISourceBlock<string> _source;
    _source = new BufferBlock<string>(new DataflowBlockOptions() { BoundedCapacity = 10000 });

    //Read a file
    While(!eof)
        row = read one row 
    //if consumers are slow, then sleep for a while
    while(!(_source as BufferBlock<string>).Post<string>(row))
    {
        Thread.Sleep(5000);
    }
}

This is a 5GB file with 24 million rows.

I now have a Target block which is using a ActionBlock:

public class SolaceTargetBlock : ITargetBlock<string>
       private ActionBlock<IBasicDataContract> _publishToSolaceBlock;

       public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, string messageValue, ISourceBlock<string> source, bool consumeToAccept)
    {
        //post to another block to publish
        bool success = _publishToSolaceBlock.Post(messageValue);

Now in a console application, I specify:

 SolaceTargetBlock solaceTargetBlock1 = new SolaceTargetBlock("someparam", 
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
 SolaceTargetBlock solaceTargetBlock2 = new SolaceTargetBlock("someparam", 
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
 SolaceTargetBlock solaceTargetBlock3 = new SolaceTargetBlock("someparam", 
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });

 DelimitedFileBlock delimitedFileBlock = new DelimitedFileBlock(csvFileInfo);

I have kept bounded capacity as 1 for testing only.

and now I link these three consumers to my source using LinkTo:

 delimitedFileBlock.LinkTo(solaceTargetBlock1);      
 delimitedFileBlock.LinkTo(solaceTargetBlock2);      
 delimitedFileBlock.LinkTo(solaceTargetBlock3);      

This goes to Thread.Sleep(5000) statement after 10003 rows and the Post in the while loop always returns false.

I was expecting that as I have LinkTo, so the solaceTargetBlocks when done will be able to pick next messages, but LinkTo is not clearing the BufferBlock. So, then how can I load balance between multiple Consumers. Do I have to Receive and write a simple Load Balancing logic to distribute between Consumers?

Filamentary answered 20/9, 2012 at 8:43 Comment(1)
This all depends on how exactly you implemented the block interfaces. But, unless you're doing something more complicated, you don't have to (and probably shouldn't) implement the interfaces yourself. Just create a block settings you need and that's all.Exarchate
G
14

From the documentation of the Post method on the DataflowBlock<T> class (emphasis mine):

This method will return once the target block has decided to accept or decline the item,

This means that the target can choose to decline the block (which is the behavior that you're seeing).

Further on, it states:

For target blocks that support postponing offered messages, or for blocks that may do more processing in their Post implementation, consider using SendAsync, which will return immediately and will enable the target to postpone the posted message and later consume it after SendAsync returns.

This means that you may have better results (depending on the target block) in that your message may be postponed, but still processed, as opposed to outright rejected.

I imagine that the BoundedCapacity property settings on both the BufferBlock<T> and the three ActionBlock<TInput> instances have something to do with what you're seeing:

  • Your maximum buffer on the BufferBlock<T> is 10000; once you put 10,000 items into the queue, it will reject the rest (see second quote above), as it can't process them (SendAsync won't work here either, as it can't buffer the message to be postponed).

  • Your maximum buffer on the ActionBlock<TInput> instances is 1, and you have three of them.

10,000 + (1 * 3) = 10,000 + 3 = 10,003

To get around this, you need to do a few things.

First, you need to set a more reasonable value for MaxDegreeOfParallelism property the ExecutionDataFlowBlockOptions when creating the ActionBlock<TInput> instances.

By default, the MaxDegreeOfParallelism for an ActionBlock<TInput> is set to 1; this guarantees that calls will be serialized and you don't have to worry about thread-safety. If you want the ActionBlock<T> to be concerned about thread-safety, then keep this setting.

If the ActionBlock<TInput> is thread-safe, then you have no reason to throttle it, and you should set MaxDegreeOfParallelism to DataflowBlockOptions.Unbounded.

Chances are if you're accessing some sort of shared resource in the ActionBlock<TInput> that can be accessed concurrently on a limited basis, then you're probably doing the wrong thing.

If you have some sort of shared resource, then chances are you should run it through another block and set the MaxDegreeOfParallelism on that.

Second, if you are concerned with throughput and are ok with dropped items, then you should set the BoundedCapacity property.

Also note, you indicate "if consumers are slow, sleep for a while"; there's no reason to do this if you wire up your blocks correctly, you should just let the data flow through and place the restrictions only where you need them. Your producer shouldn't be responsible for throttling the consumer, let the consumer be responsible for the throttling.

Lastly, your code doesn't look like you need to implement the dataflow block interfaces yourself. You could construct it like so:

// The source, your read lines will be posted here.
var delimitedFileBlock = new BufferBlock<string>();

// The Action for the action blocks.
Action<string> action = 
    s => { /* Do something with the string here. */ };

// Create the action blocks, assuming that
// action is thread-safe, no need to have it process one at a time
// or to bound the capacity.
var solaceActionBlock1 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });
var solaceActionBlock2 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });
var solaceActionBlock3 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });

// Link everything.
delimitedFileBlock.LinkTo(solaceTargetBlock1);
delimitedFileBlock.LinkTo(solaceTargetBlock2);
delimitedFileBlock.LinkTo(solaceTargetBlock3);

// Now read the file, and post to the BufferBlock<T>:
// Note: This is pseudo-code.
while (!eof)
{
    // Read the row.
    string row = ...;

    delimitedFileBlock.Post(read);
}

Also note that having three ActionBlock<TInput> instances is uncessary, unless you need to filter the output to different actions (which you aren't doing here), so the above really reduces to this (assuming your action is thread-safe, so you're going to increase MaxDegreeOfParallelism to Unbounded anyways):

// The source, your read lines will be posted here.
var delimitedFileBlock = new BufferBlock<string>();

// The Action for the action blocks.
Action<string> action = 
    s => { /* Do something with the string here. */ };

// Create the action blocks, assuming that
// action is thread-safe, no need to have it process one at a time
// or to bound the capacity.
var solaceActionBlock1 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });

// Link everything.
delimitedFileBlock.LinkTo(solaceTargetBlock);

// Now read the file, and post to the BufferBlock<T>:
// Note: This is pseudo-code.
while (!eof)
{
    // Read the row.
    string row = ...;

    delimitedFileBlock.Post(read);
}
Gretchen answered 16/11, 2012 at 21:9 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.