Global per-block error handling in a Dataflow pipeline
Asked Answered
C

1

12

I am designing a long-running Dataflow pipeline that consists of multiple blocks. Items are fed to the input block of the pipeline, eventually make their way through it, and are displayed in the UI at the end (as a courtesy to the user -- the pipeline's real job is to save processing results to disk).

The lambda functions inside the pipeline blocks can throw exceptions, for a variety of reasons (bad input, network failure, error during calculation, whatever). In this case, instead of faulting the entire pipeline, I'd like to kick out the offending item, and display it in the UI under "Errors".

What's the best way to do that ? I understand that I can wrap every single lambda function in a try/catch:

var errorLoggingBlock = new ActionBlock<Tuple<WorkItem, Exception>>(...)

var workerBlock = new TransformBlock<WorkItem, WorkItem>(item => 
{
    try {
        return DoStuff(item);
    } catch (Exception ex) {
        errorLoggingBlock.SendAsync(Tuple.Create(item, ex));
        return null;
    }
}

But I have about 10 blocks in the pipeline, and copy/pasting that code into each one seems silly. Also, I don't like the idea of returning null, since now all of the downstream blocks will have to check for it.

My next best idea is to create a function that returns a lambda that does the wrapping for me:

  private Func<TArg, TResult> HandleErrors<TArg, TResult>(Func<TArg, TResult> f) where TArg:WorkItem
  {
     return arg =>
     {
        try {
           return f(arg);
        } catch (Exception ex) {
           errorLoggingBlock.SendAsync(Tuple.Create(item, ex));
           return default(TResult);
        }
     };
  }

But this seems a bit too meta. Is there a better way ?

Coycoyle answered 12/8, 2015 at 5:13 Comment(1)
You could take a look as Stephen Cleary's minimalistic Try library. It allows to pass a message through all the blocks of a pipeline, and then observe any exception that occurred to this message at the end.Melanimelania
E
4

That's a very interesting subject.

You can define filters when you link blocks which means you can divert error results to error handling blocks. To do that, blocks should return "meta" objects that contain both their processing results and at least a fail/success indicator.

This idea is better described in Railroad Oriented Programming, where each function in a chain processes succesful results or diverts failed results to a "failed track" for eventual logging.

In practical terms, this means that you should add two links after each block: one with a filter condition that diverts to an error handling block, and one default link that goes to the next step in the flow.

You could even combine the two ideas to handle partial failures. A partial failure result would contain both a failure indicator and a payload. You could divert the result to a logging block before passing it on to the next step.

I've found it's much easier to be explicit about the status of each message rather than try to determine its status by checking for null, missing values etc. This means that blocks should wrap their results in "envelope" objects that contain status flags, the results and/or any errors.

Epaulet answered 12/8, 2015 at 15:52 Comment(1)
Thanks, I think I understand the concept, but this doesn't answer my question. How can I automatically ensure that all of my blocks emit the error-signalling item whenever their processing functions throw errors ? As I said in the question, I can manually wrap them all in try/catch, but there must be a better way...Coycoyle

© 2022 - 2024 — McMap. All rights reserved.