I have the following pseudo code:
var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5 });
var a = new ActionBlock<int>(async item =>
{
await Task.Delay(500);
Trace.TraceInformation(
$"Target 1: | Type: {typeof(int).Name} | Thread: {Thread.CurrentThread.ManagedThreadId} | Message: {item}");
// handling some logic but it throws
if (item >= 5) throw new Exception("Something bad happened");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 });
queue.LinkTo(a, new DataflowLinkOptions { PropagateCompletion = true });
var targets = new List<ITargetBlock<int>> {queue};
var broadcaster = new ActionBlock<int>(
async item =>
{
var processingTasks = targets.Select(async t =>
{
try
{
// This is condition is always false
// t (bufferblock) has no exceptions. Exception is raised in downstream action block where it sends to
if (!await t.SendAsync(item))
await t.Completion;
}
catch (Exception e)
{
Trace.TraceInformation("Handled exception : " + e.Message);
}
});
try
{
// Neither here the exception is rethrowed
await Task.WhenAll(processingTasks);
}
catch (Exception e)
{
Trace.TraceInformation("Handled exception WhenAll : " + e.Message);
}
});
for (var i = 1; i <= 10; i++)
{
broadcaster.Post(i);
}
The pipeline is configured like that ActionBlock<int> => BufferBlock<int> => ActionBlock<int>
.
The last ActionBlock<int>
throws an exception but it is not rethrown to source block where I would like to handle it.
How this code can be rewritten so it handles exceptions correctly?
Post
orSendAsync
message to tpl block, it only adds it to it's buffer, and that's it, you have no reference for it neither to the exception thrown. – Valid