Exception handling in observable pipeline
Asked Answered
D

3

5

I have created an observable that consists of an item being transformed to another by running an async method.

IObservable<Summary> obs = scanner.Scans
                    .SelectMany(b => GetAssignment(b))
                    .SelectMany(b => VerifyAssignment(b))
                    .SelectMany(b => ConfirmAssignmentData(b))
                    .SelectMany(b => UploadAsset(b))
                    .Select(assignment => new Summary())
                    .Catch(LogException());

I would like to make this fail-proof, so in case an exception is thrown during the processing, I should log the exception, but ignore the exception and resume with the next scan (the next item pushed by scanner.Scans)

The current code catches any exception, but the sequence finished as soon as an exception is thrown.

How can I make it "swallow" the exception (logging it), but to resume with the next item?

Dewie answered 13/5, 2021 at 11:18 Comment(6)
Wouldn't it be better to Retry rather than just logging? Better still RetryWithBackoffStrategy. Cut's down on human interventionIvon
The thing is that I don't want it to retry the errored item. I would like to just discard that item and go for the next (but logging the exception)Dewie
You don't want to retry a possible I/O error (e.g. UploadAsset)? You might find that waiting a bit succeeds. Errors might not always come from some compute operation (which might tend to fail repeatedly if its duff data so I agree with you there)Ivon
@MickyD You're right. There are some exceptions under which I would like to retry, of course. A Retry would be necessary at some point/s of the pipelile. But there are some exceptions that I would like to ignore. In my case, when the item isn't found, a an ApiException with a 404 error code will be thrown. I would like to ignore them in any of the steps of the pipeline (all steps imply a call to a RESTful API)Dewie
Thanks. I get the problem now. +1Ivon
Related: How to implement a custom SelectMany operator that waits for all observable subsequences to complete?Shorten
M
5

Rx is a functional paradigm so it's very useful to use a functional approach to solving this problem.

The answer is to introduce another monad that can cope with errors, like Nullable<T> can cope with integers having a null value, but in this case a class that can either represent a value or an exception.

public class Exceptional
{
    public static Exceptional<T> From<T>(T value) => new Exceptional<T>(value);
    public static Exceptional<T> From<T>(Exception ex) => new Exceptional<T>(ex);
    public static Exceptional<T> From<T>(Func<T> factory) => new Exceptional<T>(factory);
}

public class Exceptional<T>
{
    public bool HasException { get; private set; }
    public Exception Exception { get; private set; }
    public T Value { get; private set; }

    public Exceptional(T value)
    {
        this.HasException = false;
        this.Value = value;
    }

    public Exceptional(Exception exception)
    {
        this.HasException = true;
        this.Exception = exception;
    }

    public Exceptional(Func<T> factory)
    {
        try
        {
            this.Value = factory();
            this.HasException = false;
        }
        catch (Exception ex)
        {
            this.Exception = ex;
            this.HasException = true;
        }
    }

    public override string ToString() =>
        this.HasException
            ? this.Exception.GetType().Name
            : (this.Value != null ? this.Value.ToString() : "null");
}


public static class ExceptionalExtensions
{
    public static Exceptional<T> ToExceptional<T>(this T value) => Exceptional.From(value);

    public static Exceptional<T> ToExceptional<T>(this Func<T> factory) => Exceptional.From(factory);

    public static Exceptional<U> Select<T, U>(this Exceptional<T> value, Func<T, U> m) =>
        value.SelectMany(t => Exceptional.From(() => m(t)));

    public static Exceptional<U> SelectMany<T, U>(this Exceptional<T> value, Func<T, Exceptional<U>> k) =>
        value.HasException ? Exceptional.From<U>(value.Exception) : k(value.Value);

    public static Exceptional<V> SelectMany<T, U, V>(this Exceptional<T> value, Func<T, Exceptional<U>> k, Func<T, U, V> m) =>
        value.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t, u))));
}

So, let's start by creating an Rx query that throws an exception.

IObservable<int> query =
    Observable
        .Range(0, 10)
        .Select(x => 5 - x)
        .Select(x => 100 / x)
        .Select(x => x + 5);

If I run the observable I get this:

Normal Query

Let's transform this with with Exceptional and see how it allows us to continue processing when an error occurs.

IObservable<Exceptional<int>> query =
    Observable
        .Range(0, 10)
        .Select(x => x.ToExceptional())
        .Select(x => x.Select(y => 5 - y))
        .Select(x => x.Select(y => 100 / y))
        .Select(x => x.Select(y => y + 5));

Now when I run it I get this:

Query with Exceptional

Now I could test each result, see if HasException is true and log each exception, meanwhile the observable continues.

Finally, it's easy to clean up the query to look almost the same as the original by introducing one further extension method.

    public static IObservable<Exceptional<U>> Select<T, U>(this IObservable<Exceptional<T>> source, Func<T, U> m) =>
        source.Select(x => x.SelectMany(y => Exceptional.From(() => m(y))));

This combines observables and exceptionals into a single Select operator.

Now the query can look like this:

IObservable<Exceptional<int>> query =
    Observable
        .Range(0, 10)
        .Select(x => x.ToExceptional())
        .Select(x => 5 - x)
        .Select(x => 100 / x)
        .Select(x => x + 5);

I get the same result at before.


Finally, I could get this all working with query syntax by adding two more extension methods:

public static IObservable<Exceptional<U>> SelectMany<T, U>(this IObservable<T> source, Func<T, Exceptional<U>> k) =>
    source.Select(t => k(t));

public static IObservable<Exceptional<V>> SelectMany<T, U, V>(this IObservable<T> source, Func<T, Exceptional<U>> k, Func<T, U, V> m) =>
    source.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t, u))));

This allows:

IObservable<Exceptional<int>> query =
    from n in Observable.Range(0, 10)
    from x in n.ToExceptional()
    let a = 5 - x
    let b = 100 / a
    select b + 5;

Again, I get the same results as before.

Metopic answered 14/5, 2021 at 4:0 Comment(5)
This approach reminds me of Stephen Cleary's minimalistic Try library.Shorten
@TheodorZoulias - It's basically the same thing. That monad comes in many names.Metopic
I would absolutely rate this answer as Exceptional. Thanks @Enigmativity. I consider myself one of your most fervent followers. If only you had some blog or publications where others could learn from you, apart from from this site. We all need mentors like you.Dewie
@Dewie - Occasionally I fluke a good answer. ;-) I appreciate your comments, but I don't do a blog or anything like it. I really only use SO as my community tech outlet these days. I'm happy to help. :-)Metopic
@Metopic Sorry to ask, but I just posted a question that I suspect you would know the answer to in less than a second (I keep seeing your name pop up when people are asking about observables, and my question is based on this answer) ... #74554378Dilettante
J
3

The question presumes a fundamental misunderstanding: According to the Observable contract, a well-behaved observable terminates after an OnError notification. To your case, there is no, "just log and continue" option, because there's nothing to continue on. The observable that throws an exception via an OnError is done, kaput, finito, gone forever.

A comment mentioned Retry, which may be applicable: If you have an observable pipeline like so:

someHotSource
    .SelectMany(e => f(e)) //operator1
    .SelectMany(e => g(e)) //operator2
    .Subscribe(e => {});

Then the exception could occur in one of the operators, killing the pipeline, but the source could still be alive. Retry will then try to re-create a new pipeline with the same functions.

You can try to 'cheat' the Observable contract by using Materialize and Dematerialize, but you would be swimming upstream. The trick with cheating is to make sure that no part of the pipeline doesn't see a 'raw' OnError, because that operator will terminate. Rather Materialize turns an OnError into a Notification, which doesn't blow up. That would look like this:

Given a well-behaved pipeline like this:

var someHotSource = new Subject<int>();
var f = new Func<int, IObservable<int>>(i => Observable.Return(i));
var g = new Func<int, IObservable<int>>(i =>
{
    if(i % 13 == 0)
        return Observable.Throw<int>(new Exception());
    return Observable.Return(i);
});

var LogException = new Action<Exception>(e => Console.WriteLine("Exception"));
var p1 = someHotSource
    .SelectMany(e => f(e)) //operator1
    .SelectMany(e => g(e)) //operator2: suspect
    .Subscribe(e => Console.WriteLine(e));

...you can cheat like this:

var p2 = someHotSource
    .SelectMany(e => f(e)) //operator1
    .SuspectSelectMany(e => g(e), LogException) //operator2: suspect
    .Subscribe(e => Console.WriteLine(e));

public static class X
{
    public static IObservable<Notification<T>> IgnoreOnCompleted<T>(this IObservable<Notification<T>> source)
    {
        return source
            .SelectMany(n => n.Kind == NotificationKind.OnCompleted
                ? Observable.Empty<Notification<T>>()
                : Observable.Return(n)
            );
    }
    
    public static IObservable<U> SuspectSelectMany<T, U>(this IObservable<T> source, Func<T, IObservable<U>> selector, Action<Exception> handler)
    {
        var x = source
            .Materialize()
            .SelectMany(e => selector(e.Value).Materialize().IgnoreOnCompleted()) //execute suspect selector, turn immediately into notifications
            .SelectMany(e =>
            {
                if (e.Kind == NotificationKind.OnError)
                {
                    handler(e.Exception);
                    return Observable.Empty<Notification<U>>();
                }
                else
                    return Observable.Return(e);
            }) //error logging/suppression
            .Dematerialize();
        return x;
    }
}

Then given the following runner code:

someHotSource.OnNext(1);
someHotSource.OnNext(12);
someHotSource.OnNext(13);
someHotSource.OnNext(15);

p1 will bomb. p2 will produce the following output:

1
12
Exception
15
Jamnis answered 13/5, 2021 at 13:30 Comment(3)
Wow, I can't believe that a so common use case is so complex to achieve. Do you think there's a generic way to cover this than "hacking" the sequence with Materialize?Dewie
Well, I wrapped it in a function, so that's generic.Jamnis
More clearly though, the use case isn't (or shouldn't) be common. Pass-through exception handling on an operator should happen in an operator, not in the pipeline. What you're asking for is the equivalent of a try-catch formation outside of a for-loop, but handing it in such a way the for-loop continues on exception. If you wanted that, you would put the try-catch inside the for-loop, not outside.Jamnis
S
1

You could use the application-specific operator LogAndIgnoreError below:

/// <summary>Ensures that the source sequence will always complete successfully.
/// In case of failure the error is logged.</summary>
public static IObservable<T> LogAndIgnoreError<T>(this IObservable<T> source)
{
    return source.Catch((Exception error) =>
    {
        // Application-specific logging
        Console.WriteLine($"Log - {error.GetType().Name}: {error.Message}");
        return Observable.Empty<T>();
    });
}

You could then attach this operator to any sequence whose error you would like to ignore.

Usage example:

IObservable<Summary> obs = scanner.Scans
    .SelectMany(b => GetAssignment(b).LogAndIgnoreError())
    .SelectMany(b => VerifyAssignment(b).LogAndIgnoreError())
    .SelectMany(b => ConfirmAssignmentData(b).LogAndIgnoreError())
    .SelectMany(b => UploadAsset(b).LogAndIgnoreError())
    .Select(assignment => new Summary())
    .LogAndIgnoreError();
Shorten answered 14/5, 2021 at 7:40 Comment(3)
The observable still terminates in this scenario right? So it does't fix OP's problemGypsy
@Gypsy the resulting obs sequence terminates when the source scanner.Scans sequence has completed, and all the items that have been emitted from the scanner.Scans sequence have been either processed or failed. At least this is what the code in my answer is supposed to do, and what the OP has asked. Have you tested the code and observed a different behavior?Shorten
oops sorry I missed that your extension method is applied to all the SelectMany's, in my own code I don't have an observable of observables and was just trying to use the last LogAndIgnoreErrorGypsy

© 2022 - 2024 — McMap. All rights reserved.