TPL Dataflow and exception handling in downstream blocks
Asked Answered
G

1

5

I have the following pseudo code:

var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5 });
var a = new ActionBlock<int>(async item =>
    {
        await Task.Delay(500);
        Trace.TraceInformation(
            $"Target 1: | Type: {typeof(int).Name} | Thread: {Thread.CurrentThread.ManagedThreadId} | Message: {item}");
        // handling some logic but it throws
        if (item >= 5) throw new Exception("Something bad happened");

    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 });

queue.LinkTo(a, new DataflowLinkOptions { PropagateCompletion = true });

var targets = new List<ITargetBlock<int>> {queue};

var broadcaster = new ActionBlock<int>(
    async item =>
    {
        var processingTasks = targets.Select(async t =>
        {
            try
            {
                // This is condition is always false
                // t (bufferblock) has no exceptions. Exception is raised in downstream action block where it sends to
                if (!await t.SendAsync(item))
                    await t.Completion;
            }
            catch (Exception e)
            {
                Trace.TraceInformation("Handled exception : " + e.Message);
            }
        });

        try
        {
            // Neither here the exception is rethrowed
            await Task.WhenAll(processingTasks);
        }
        catch (Exception e)
        {
            Trace.TraceInformation("Handled exception WhenAll : " + e.Message);
        }
    });

for (var i = 1; i <= 10; i++)
{
    broadcaster.Post(i);
}

The pipeline is configured like that ActionBlock<int> => BufferBlock<int> => ActionBlock<int>.

The last ActionBlock<int> throws an exception but it is not rethrown to source block where I would like to handle it.

How this code can be rewritten so it handles exceptions correctly?

Girandole answered 7/9, 2017 at 20:12 Comment(2)
You cannot handle exception from other blocks. When you Post or SendAsync message to tpl block, it only adds it to it's buffer, and that's it, you have no reference for it neither to the exception thrown.Valid
It does work if the ˋt` were another action block raising the exception. My question is more about how to design pipelines with nice exception handling instead of spreading try catches in many placesRadiothorium
V
9

You can find the official guidelines for this topic here. Overall solution is to subscribe for all the blocks Completion task with checking the state of it, and, in case of need, replacing the faulted block (one should store all the references for the blocks too). Please refer to whole article to more information.

Behaviors of a network with Faulted blocks

  1. Reserved Messages
    In order to avoid message corruption, a faulted block should clear its message queues and move into a Faulted state as soon as possible. There is a single scenario that does not obey to this rule: a source block holding a message reserved by a target. If a block that encounters an internal exception has a message that was reserved by a target, the reserved message must not be dropped, and the block should not be moved into the Faulted state until the message is released or consumed.

  2. Hanging Networks
    ...

    • Keep a reference to all the blocks in the network and use Task.WaitAll or Task.WhenAll to wait for them (synchronously or asynchronously). If a block faults, its Completion task will complete in the Faulted state.
    • Use DataflowLinkOptions with PropagateCompletion == true when building a linear network. That will propagate block completion from source to target. In this case it is enough to wait on the network leaf block.
Valid answered 7/9, 2017 at 23:28 Comment(2)
What do you mean by "network leaf block"? Is this the last block of your dataflow pipeline?Midden
@user2972161 It's not me, it's a quote from official docs. Leaf means that the block in graph do not have any children blocks, so, yes, it's the last block in sequence (but not necessarily the last - it depends on your pipeline)Valid

© 2022 - 2024 — McMap. All rights reserved.