I have a chain of TPL Dataflow blocks and would like to observe progress somewhere inside the system.
I am aware that I could just jam a TransformBlock
into the mesh where I want to observe, get it to post to a progress updater of some variety and then return the message unchanged to the next block. I don't love this solution as the block would be purely there for its side-effect and I would also have to change the block linking logic wherever I want to observe.
So I wondered if I could use ISourceBlock<T>.AsObservable
to observe the passing of messages within the mesh without altering it and without consuming the messages. This seems both a purer and more practical solution, if it worked.
From my (limited) understanding of Rx that means that I need the observable to be hot rather than cold, so that my progress
updater sees the message but doesn't consume it. And .Publish().RefCount()
seems to be the way to make an observable hot. However, it simply does not work as intended - instead either block2
or progress
receives and consumes each message.
// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()), new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var obs = block1.AsObservable().Publish().RefCount(); // Declare this here just in case it makes a difference to do it before the LinkTo call.
var l1 = block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true});
// Progress
obs.ForEachAsync(i => Debug.Print("progress:" + i.ToString()));
// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
block1.Post(v);
}
block1.Complete();
Result is non-deterministic but I get something mixed like this:
block2:21
progress:22
progress:24
block2:23
progress:25
So, am I doing something wrong, or is this impossible due to the way the way TPL Dataflow AsObservable
is implemented?
I realise I could also replace the LinkTo
between block1
and block2
with an Observable/Observer pair and that might work, but LinkTo
with downstream BoundedCapacity = 1
is the whole reason I'm using TPL Dataflow in the first place.
edit: A few clarifications:
- I did intend to set
BoundedCapacity=1
in block2. While it's unnecessary in this trivial example, the downstream-constrained case is where I find TPL Dataflow really useful. To clarify the solution I rejected in my second paragraph, it would be to add the following block linked in between block1 and block2:
var progressBlock = new TransformBlock<int, int>( i => {SomeUpdateProgressMethod(i); return i;});
I would also like to maintain back-pressure so that if a further-upstream block was distributing work to
block1
and also other equivalent workers, it wouldn't send work toblock1
if that chain was already busy.
.Publish().RefCount()
as it can create observables that can run only once. Do you really need to share observers? – Lateen.Publish().RefCount()
, I just thought from reading around that it might make the observable 'hot' so that both the progress updater andblock2
see the throughput.block2
needs to receive the data for computation - the simpleblock2
in the example could be a stand-in for a whole chain of interlinked dataflow blocks to execute a computation. The observable ofblock1
meanwhile is for progress updates i.e. reports eventually to a UI. – Dynamiter.Publish().RefCount()
allows for multiple observers of the same source stream and it has nothing to do with what the source is or how it gets its data. – Lateenblock2
andprogress
. My guess however is thatblock2
isn't treated as anObserver
because its link toblock1
is not done in the RX way, but instead however TPL Dataflow implementsLinkTo
internally (andAsObservable
). So we don't get successfulMultiCast
because it would need to be set up like that insideblock1
. Does that sound right? – DynamiterForEachAsync
method call (which you should use.Subscribe
instead). – LateenGroupingDataflowBlockOptions
i.e.JoinBlock
andBatchBlock
. That design makes some sense to me, however if you put other types of block i.e.TransformBlock
into your mesh before that, you lose the ability to transmit the non-greediness further back upstream. So I useBoundedCapacity=1
(zero is not allowed). Basically TPL Dataflow puts buffers, buffers everywhere, by default, but in downstream-constrained backpressure use-case I only want a buffer in certain spots, definitely not others. – DynamiterLinkTo
predicate to post each message to an observable and then return true. To hide the impurity of this it might be possible to create an extension methodObservableLinkTo
which creates the observable and calls the regularLinkTo
at the same time. – DynamiterBroadcastBlock
won't propagate data to targets that can't accept it immediately. I missed that. This is an excellent question, I'm where you were when you asked it -AsObservable
feels like the perfect solution for publishing a stream of progress updates, but for the details. Sounds like you've moved on but if I come up with something that works and isn't too hacky I might post an answer, in case it helps others. – Penates