TPL Dataflow, how to forward items to only one specific target block among many linked target blocks?
Asked Answered
N

2

22

I am looking for a TPL data flow block solution which can hold more than a single item, which can link to multiple target blocks, but which has the ability to forward an item to only a specific target block that passes a filter/predicate. At no time should an item be delivered to multiple target blocks at the same time, always only to the one which matches the filter or the item can be discarded. I am not fond of BroadCastBlock because, if I understand correctly, it does not guarantee delivery (or does it?) and the filtering is done on the target block side, meaning BroadCastBlock essentially sends copies of each item to all linkedTo target blocks. It also does not hold more than one item at any time if I understand correctly. I do not want to use Post/Async but maintain a LinkTo chain.

Is there a way around a complete custom data flow block? Or am I misunderstanding how BroadCastBlock works? Unfortunately there really is not much documentation out there that goes into detail and covers use cases. Any ideas are highly appreciated.

Nkrumah answered 28/11, 2012 at 6:35 Comment(0)
C
35

If I understand you correctly, what you want could be accomplished by a simple BufferBlock, which would be linked to all your target blocks with predicates. You would also (unconditionally) link it to a NullTarget block, to discard items that didn't match.

Something like:

var forwarder = new BufferBlock<SomeType>();
forwarder.LinkTo(target1, item => matchesTarget1(item));
forwarder.LinkTo(target2, item => matchesTarget2(item));
forwarder.LinkTo(DataflowBlock.NullTarget<SomeType>());

This way, each item will be sent to the first target that matches, if there is any.

BroadcastBlock can be useful if you want to send each item to multiple targets, or if you want to discard items if the target block is not fast enough.

With BroadcastBlock, items may be dropped if no block accepts them (even though they may be able to accept it later). But it doesn't drop items at random, so if your target blocks don't have BoundedCapacity set, I think you can be sure that they will get all items that they don't decline (e.g. by using predicate in LinkTo()).

Compliment answered 28/11, 2012 at 16:10 Comment(8)
How does this technically work? Is each target tried until there is a match and if not its clogging up memory unless flushed by the null target block? And does the order of which target block I linkTo first then next ...matter?Nkrumah
Yeah, each target is tried in sequence. If no target matches, the item will stay in the buffer block. And in that case, clogging up memory is not a big concern, clogging up the pipeline is. In other words, it would mean no other item will be sent from this block until the problematic one is accepted by some target. That's why the NullTarget block is necessary. And yes, the order matters, which is also why you can specify if you want to append or prepend each target to the list.Compliment
You really are the guy to go to re TDF. Awesome. Thanks a lot. Are you extensively using TPL dataflow or why the deep knowledge (among other topics) of this library? You mentioned earlier you are not affiliated with the MS concurrency team.Nkrumah
Last question. So for performance purposes it would be advisable to have the target block, most frequently streamed to, linked to first, I assume? I mean given a filter is provided...Nkrumah
It's just that I'm really interested about TPL and especially about TDF. And I also implemented part of TDF for mono (as a Google Summer of Code project), which meant I had learn a lot about it.Compliment
And yeah, for performance reasons, you should put the block you expect to be used the most first.Compliment
SVick is quickly becoming the Paul Erdős of TDF answers. I'm sure a few years from now, we'll be using a SVick number to measure the collaborative distance between our peers and him... ; )Livraison
@Compliment This answer is correct if the target blocks have unbounded capacity. If the targets are bounded and you expect the source block to throttle itself instead of just discarding the messages, the null target should have a predicate that is a negation of the previous target predicates.Bacteroid
R
14

I've found the accepted answer to be incorrect. The NullTarget should be linked with its predicate being the negation of your consumers. Otherwise you might drop messages that you wanted to consume.

var forwarder = new BufferBlock<SomeType>();
forwarder.LinkTo(target1, item => matchesTarget1(item));
forwarder.LinkTo(target2, item => matchesTarget2(item));
forwarder.LinkTo(DataflowBlock.NullTarget<SomeType>(), item => !matchesTarget1(item) && !matchesTarget2(item));
Radiotherapy answered 19/6, 2015 at 19:55 Comment(4)
Could you please elaborate about the messages dropping? Filter is applying in sequence, so, if linking done in proper order, there is no way to drop a valid message, right?Whosoever
@Whosoever So finally, does linking order by sequence? Just want to make sure..Tempting
@Whosoever if your target block has a bounded capacity and the input queue is full the source block can't tell if it was denied because of the predicate or the queue being full. This will result in "dropped" messages if your final destination is a null target without a predicate. I lost a lot of time to this because seemingly every write up on Dataflow gets this detail wrong.Bacteroid
@Bacteroid You saved me a lot of Multi-Threading Debugging. Your comment was indeed the first time I've read this detail.Otterburn

© 2022 - 2024 — McMap. All rights reserved.