Using AsObservable to observe TPL Dataflow blocks without consuming messages
Asked Answered
D

4

9

I have a chain of TPL Dataflow blocks and would like to observe progress somewhere inside the system.

I am aware that I could just jam a TransformBlock into the mesh where I want to observe, get it to post to a progress updater of some variety and then return the message unchanged to the next block. I don't love this solution as the block would be purely there for its side-effect and I would also have to change the block linking logic wherever I want to observe.

So I wondered if I could use ISourceBlock<T>.AsObservable to observe the passing of messages within the mesh without altering it and without consuming the messages. This seems both a purer and more practical solution, if it worked.

From my (limited) understanding of Rx that means that I need the observable to be hot rather than cold, so that my progress updater sees the message but doesn't consume it. And .Publish().RefCount() seems to be the way to make an observable hot. However, it simply does not work as intended - instead either block2 or progress receives and consumes each message.

// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()), new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 }); 
var obs = block1.AsObservable().Publish().RefCount(); // Declare this here just in case it makes a difference to do it before the LinkTo call.
var l1 = block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true});

// Progress
obs.ForEachAsync(i => Debug.Print("progress:" + i.ToString()));

// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
    block1.Post(v);
}
block1.Complete();

Result is non-deterministic but I get something mixed like this:

block2:21
progress:22
progress:24
block2:23
progress:25

So, am I doing something wrong, or is this impossible due to the way the way TPL Dataflow AsObservable is implemented?

I realise I could also replace the LinkTo between block1 and block2 with an Observable/Observer pair and that might work, but LinkTo with downstream BoundedCapacity = 1 is the whole reason I'm using TPL Dataflow in the first place.

edit: A few clarifications:

  • I did intend to set BoundedCapacity=1 in block2. While it's unnecessary in this trivial example, the downstream-constrained case is where I find TPL Dataflow really useful.
  • To clarify the solution I rejected in my second paragraph, it would be to add the following block linked in between block1 and block2:

    var progressBlock = new TransformBlock<int, int>( i => {SomeUpdateProgressMethod(i); return i;});

  • I would also like to maintain back-pressure so that if a further-upstream block was distributing work to block1 and also other equivalent workers, it wouldn't send work to block1 if that chain was already busy.

Dynamiter answered 16/6, 2017 at 1:43 Comment(14)
Careful using .Publish().RefCount() as it can create observables that can run only once. Do you really need to share observers?Lateen
@Lateen - I don't have a great understanding of .Publish().RefCount(), I just thought from reading around that it might make the observable 'hot' so that both the progress updater and block2 see the throughput. block2 needs to receive the data for computation - the simple block2 in the example could be a stand-in for a whole chain of interlinked dataflow blocks to execute a computation. The observable of block1 meanwhile is for progress updates i.e. reports eventually to a UI.Dynamiter
It's not as simple as "hot" and "cold". Just keep in mind that .Publish().RefCount() allows for multiple observers of the same source stream and it has nothing to do with what the source is or how it gets its data.Lateen
I don't fully understand, however multiple observers is indeed what I want, i.e. block2 and progress. My guess however is that block2 isn't treated as an Observer because its link to block1 is not done in the RX way, but instead however TPL Dataflow implements LinkTo internally (and AsObservable). So we don't get successful MultiCast because it would need to be set up like that inside block1. Does that sound right?Dynamiter
Yes, that sounds better. The issue is that the observable only has one observer in your code. The Dataflow blocks are not observers of your observable. The only observer you have is created in the ForEachAsync method call (which you should use .Subscribe instead).Lateen
@Dynamiter did you try to set the blocks to non-greedy instead of limiting their capacity? This may be what you want, as block wouldn't ask for more messages if it's busy.Erectile
Non-greedy is the behaviour wanted, however it's only available on GroupingDataflowBlockOptions i.e. JoinBlock and BatchBlock. That design makes some sense to me, however if you put other types of block i.e. TransformBlock into your mesh before that, you lose the ability to transmit the non-greediness further back upstream. So I use BoundedCapacity=1 (zero is not allowed). Basically TPL Dataflow puts buffers, buffers everywhere, by default, but in downstream-constrained backpressure use-case I only want a buffer in certain spots, definitely not others.Dynamiter
@Dynamiter did you solve this in the end?Merrillmerrily
@Merrillmerrily - It was a little while ago but I'm pretty sure I just used a hacky approach where I got the LinkTo predicate to post each message to an observable and then return true. To hide the impurity of this it might be possible to create an extension method ObservableLinkTo which creates the observable and calls the regular LinkTo at the same time.Dynamiter
@Dynamiter thanks. I spoke to a friend and he's implementing few new block that will allow this, I'll tell him to post them here.Merrillmerrily
@Dynamiter Any reason you never accepted the answer below? Seems like it answers your question perfectly.Penates
@ToddMenier - I didn't because the proposed solution either drops messages or loses backpressure, which were both unacceptable to me. I ended up writing something custom that did it. PS: If I was starting this today I'd look at a Reactive Streams implementation like Akka.NET Streams instead - looks a big improvement over TPL Dataflow.Dynamiter
@Dynamiter You're right - BroadcastBlock won't propagate data to targets that can't accept it immediately. I missed that. This is an excellent question, I'm where you were when you asked it - AsObservable feels like the perfect solution for publishing a stream of progress updates, but for the details. Sounds like you've moved on but if I come up with something that works and isn't too hacky I might post an answer, in case it helps others.Penates
@ToddMenier - yeah it does seems like a reasonable approach for tracking movement through the blocks, and the continual interest in this question means I should probably get round to posting a simplified version of what I did. I'll do that.Dynamiter
L
5

The issue with your code is that you're wiring up two consumers of block1. Dataflow is then just giving a value to which ever consumer is there first.

So you need to broadcast the values from block1 into two other blocks to then be able to consume those independently.

Just a side note, don't do .Publish().RefCount() as it doesn't do what you think. It will effectively make a one run only observable that during that one run will allow multiple observers to connect and see the same values. It has nothing to do with the source of the data nor how the Dataflow blocks interact.

Try this code:

// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20);
var block_boadcast = new BroadcastBlock<int>(i => i, new DataflowBlockOptions());
var block_buffer = new System.Threading.Tasks.Dataflow.BufferBlock<int>();
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()));
var obs = block_buffer.AsObservable();
var l1 = block1.LinkTo(block_boadcast);
var l2 = block_boadcast.LinkTo(block2);
var l3 = block_boadcast.LinkTo(block_buffer);

// Progress
obs.Subscribe(i => Debug.Print("progress:" + i.ToString()));

// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
    block1.Post(v);
}
block1.Complete();

That gives me:

block2:21
block2:22
block2:23
block2:24
block2:25
progress:21
progress:22
progress:23
progress:24
progress:25

Which is what I think you wanted.

Now, just as a further aside, using Rx for this might be a better option all around. It's much more powerful and declarative than any TPL or Dataflow option.

Your code boils down to this:

Observable
    .Range(1, 5)
    .Select(i => i + 20)
    .Do(i => Debug.Print("progress:" + i.ToString()));
    .Subscribe(i => Debug.Print("block2:" + i.ToString()));

That pretty much gives you same result.

Lateen answered 16/6, 2017 at 5:23 Comment(7)
I think this is a reasonable approach, except that by introducing a buffer after the BroadcastBlock we lose all back-pressure in the system. I didn't specify that as a constraint on the question explicitly, however the intention of the BoundedCapacity=1 in block2 is to show that it's constrained. I might want to maintain back-pressure if e.g. this is just one of multiple processing paths, and upstream there is actually an option to send to other paths if this is full. By introducing a BufferBlock here, this path is now essentially Greedy and the work can't successfully be distributed.Dynamiter
@Dynamiter - The buffer block is only there for the observable. It shouldn't affect the rest of your data flow.Lateen
Ah sorry I misread that. I'll have to test your solution. I confess I don't actually know if there's a risk of broadcast_block dropping a message if block2 rejects it - could the broadcast_block's current message be displaced during that? I'd hope not but I'm not sure. If not then this is indeed a reasonable solution.Dynamiter
(with BoundedCapacity=1 added to broadcast_block)Dynamiter
I just tested this and unfortunately it did drop messages. I thought BroadcastBlock might be implemented such that in a back-pressure situation it would pass back to its upstream blocks 'I'm full' and so they wouldn't send it another message yet, but no, that's not the case.Dynamiter
@Dynamiter - Try using Rx. It creates back-pressure but doesn't drop values.Lateen
Oh? My understanding was that Rx.NET didn't have backpressure, at least not to the point where the producer becomes aware of the downstream blockage and can adjust behaviour. Though that is getting a little away from my original question here.Dynamiter
B
2

There are two options to consider when creating an observable dataflow block. You can either:

  1. emit a notification every time a message is processed, or
  2. emit a notification when a previously processed message stored in the block's output buffer, is accepted by a linked block.

Both options have pros and cons. The first option provides timely but unordered notifications. The second option provides ordered but delayed notifications, and also must deal with the disposability of the block-to-block linking. What should happen with the observable, when the link between the two blocks is manually disposed before the blocks are completed?

Below is an implementation of the first option, that creates a TransformBlock together with a non-consuming IObservable of this block. There is also an implementation for an ActionBlock equivalent, based on the first implementation (although it could also be implemented independently by copy-pasting and adapting the TransformBlock implementation, since the code is not that much).

public static TransformBlock<TInput, TOutput>
    CreateObservableTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    out IObservable<(TInput Input, TOutput Output,
        int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();

    var semaphore = new SemaphoreSlim(1);
    int startedIndexSeed = 0;
    int completedIndexSeed = 0;

    var notificationsBlock = new BufferBlock<(TInput, TOutput, int, int)>(
        new DataflowBlockOptions() { BoundedCapacity = 100 });

    var transformBlock = new TransformBlock<TInput, TOutput>(async item =>
    {
        var startedIndex = Interlocked.Increment(ref startedIndexSeed);
        var result = await transform(item).ConfigureAwait(false);
        await semaphore.WaitAsync().ConfigureAwait(false);
        try
        {
            // Send the notifications in synchronized fashion
            var completedIndex = Interlocked.Increment(ref completedIndexSeed);
            await notificationsBlock.SendAsync(
                (item, result, startedIndex, completedIndex)).ConfigureAwait(false);
        }
        finally
        {
            semaphore.Release();
        }
        return result;
    }, dataflowBlockOptions);

    _ = transformBlock.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) ((IDataflowBlock)notificationsBlock).Fault(t.Exception);
        else notificationsBlock.Complete();
    }, TaskScheduler.Default);

    observable = notificationsBlock.AsObservable();
    // A dummy subscription to prevent buffering in case of no external subscription.
    observable.Subscribe(
        DataflowBlock.NullTarget<(TInput, TOutput, int, int)>().AsObserver());
    return transformBlock;
}

// Overload with synchronous lambda
public static TransformBlock<TInput, TOutput>
    CreateObservableTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform,
    out IObservable<(TInput Input, TOutput Output,
        int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    return CreateObservableTransformBlock(item => Task.FromResult(transform(item)),
        out observable, dataflowBlockOptions);
}

// ActionBlock equivalent (requires the System.Reactive package)
public static ITargetBlock<TInput>
    CreateObservableActionBlock<TInput>(
    Func<TInput, Task> action,
    out IObservable<(TInput Input, int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (action == null) throw new ArgumentNullException(nameof(action));
    var block = CreateObservableTransformBlock<TInput, object>(
        async item => { await action(item).ConfigureAwait(false); return null; },
        out var sourceObservable, dataflowBlockOptions);
    block.LinkTo(DataflowBlock.NullTarget<object>());
    observable = sourceObservable
        .Select(entry => (entry.Input, entry.StartedIndex, entry.CompletedIndex));
    return block;
}

// ActionBlock equivalent with synchronous lambda
public static ITargetBlock<TInput>
    CreateObservableActionBlock<TInput>(
    Action<TInput> action,
    out IObservable<(TInput Input, int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    return CreateObservableActionBlock(
        item => { action(item); return Task.CompletedTask; },
        out observable, dataflowBlockOptions);
}

Usage example in Windows Forms:

private async void Button1_Click(object sender, EventArgs e)
{
    var block = CreateObservableTransformBlock((int i) => i + 20,
        out var observable,
        new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });

    var vals = Enumerable.Range(1, 20).ToList();
    TextBox1.Clear();
    ProgressBar1.Value = 0;

    observable.ObserveOn(SynchronizationContext.Current).Subscribe(onNext: x =>
    {
        TextBox1.AppendText($"Value {x.Input} transformed to {x.Output}\r\n");
        ProgressBar1.Value = (x.CompletedIndex * 100) / vals.Count;
    }, onError: ex =>
    {
        TextBox1.AppendText($"An exception occured: {ex.Message}\r\n");
    },
    onCompleted: () =>
    {
        TextBox1.AppendText("The job completed successfully\r\n");
    });

    block.LinkTo(DataflowBlock.NullTarget<int>());

    foreach (var i in vals) await block.SendAsync(i);
    block.Complete();
}

In the above example the type of the observable variable is:

IObservable<(int Input, int Output, int StartedIndex, int CompletedIndex)>

The two indices are 1-based.

Blithering answered 21/6, 2020 at 14:20 Comment(0)
B
0

Try replacing:

obs.ForEachAsync(i => Debug.Print("progressBlock:" + i.ToString()));

with:

obs.Subscribe(i => Debug.Print("progressBlock:" + i.ToString()));

I'd imagine the ForEachAsync method isn't hooking in properly / it's firing, but something funky is going on with the async portion.

Brianbriana answered 16/6, 2017 at 3:37 Comment(2)
Thanks for the suggestion, however I get exactly the same behaviour.Dynamiter
Drats, seen @VMAtm's answer, perhaps you could decouple the two blocks, use your observable as the sole consumer, and then pipe the observed value into the second block? That way you can use the observed value as much as you like.Brianbriana
E
0

By specifying the BoundedCapacity for the block inside the chain you creating a situation where some of your messages are rejected by target blocks, as the buffer for ActionBlock is full, and message is being rejected.

With creating the observable from your buffer block you do provide a race condition: there are two consumers of your data getting messages simultaneously. Blocks in TPL Dataflow are propagating data to the first available consumer, which leads you to indeterministic state of an application.

Now, back to your problem. You can introduce a BroadcastBlock as it provides a copy of data to all the consumers, not the only first one, but in that case you have to remove the buffer size limitation, broadcast block is like a TV channel, you cannot get previous show, you only have a current one.

Side notes: you do not check the return value of Post method, you may consider the await SendAsync usage, and for better throttling effect set the BoundedCapacity for the starting point block, not for the middle one.

Erectile answered 16/6, 2017 at 4:27 Comment(2)
Thanks, the fact that my dataflow system is downstream-constrained was intentional - imo that's a use-case where the Dataflow library shines. I will add a note to that effect. For that reason I wouldn't removed the BoundedCapacity=1 setting and BroadcastBlock wouldn't be appropriate.Dynamiter
Still do not understand why do you need such restriction, can elaborate more on that? Dataflow add small overhead, if buffer is limited.Erectile

© 2022 - 2024 — McMap. All rights reserved.