RxJava2 observable take throws UndeliverableException
Asked Answered
T

4

43

As I understand RxJava2 values.take(1) creates another Observable that contains only one element from the original Observable. Which MUST NOT throw an exception as it is filtered out by the effect of take(1) as it's happened second.

as in the following code snippet

    Observable<Integer> values = Observable.create(o -> {
        o.onNext(1);
        o.onError(new Exception("Oops"));
    });

    values.take(1)
            .subscribe(
                    System.out::println,
                    e -> System.out.println("Error: " + e.getMessage()),
                    () -> System.out.println("Completed")
            );

Output

1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more

My questions :

  1. Am I understanding it correct ?
  2. What's really happening to cause the exception.
  3. How to solve this from the consumer ?
Thousandth answered 20/4, 2017 at 16:29 Comment(1)
See FlowableEmitter.tryOnError and similar methods since 2.1.1 .Marden
G
63
  1. Yes, but because the observable 'ends' does not mean the code running inside create(...) is stopped. To be fully safe in this case you need to use o.isDisposed() to see if the observable has ended downstream.
  2. The exception is there because RxJava 2 has the policy of NEVER allowing an onError call to be lost. It is either delivered downstream or thrown as a global UndeliverableException if the observable has already terminated. It is up to the creator of the Observable to 'properly' handle the case where the observable has ended and an Exception occurs.
  3. The problem is the producer (Observable) and the consumer (Subscriber) disagreeing on when the stream ends. Since the producer is outliving the consumer in this case, the problem can only be fixed in the producer.
Gujarati answered 20/4, 2017 at 17:13 Comment(14)
would if (!o.isDisposed()) { o.onError(new Exception("Oops")); } be a correct way to handle that?Ascus
If it is acceptable for that exception to be lost in the case that the observable is no longer observed then yes. If the exception should really go somewhere then it should be called unconditionally.Gujarati
@Gujarati Is there a way to handle this from consumer ?Thousandth
No, this needs to be fixed in the producer since the consumer has declared itself as terminated.Gujarati
Thanks @Gujarati please update your answer to sum up all comments. I'll mark the question as answeredThousandth
I've added one more question to the listThousandth
if (!o.isDisposed()) { o.onError(new Exception("Oops")); } is not the correct way to handle this because of race conditions (o can be disposed between the if condition and the onError call and that's not theoretical, it happens in productive systems). See also this discussion: github.com/ReactiveX/RxJava/issues/4880Radack
@EmanuelMoecklin At the time of posting, that method was not available in a public release. It is indeed the correct way to handle it post-RxJava 2.1.1Gujarati
@Gujarati tryOnError was indeed introduced after the question/answer. Nevertheless the if (!o.isDisposed... approach wasn't correct before that either (I had crashes because of that). There were workarounds like overwriting the standard RxJava error handler to deal with it.Radack
@AbdElraoufSabri I threw away that code once the tryOnError was introduced. Could find it in some old commit but what's the point now when you have tryOnError?Radack
@EmanuelMoecklin I'm still getting the UndeliverableException error even when I use tryOnError! if (emitter.isDisposed.not()){emitter.tryOnError(getFailureError(data.error()))}Armourer
@Gujarati No, this needs to be fixed in the producer since the consumer has declared itself as terminated. How can I fix it on the producer side? Please read the above comment.Armourer
If you're using tryOnError then the origin of that exception lies elsewhere. Without the full stacktrace it would be impossible to answer.Gujarati
" To be fully safe in this case you need to use o.isDisposed() to see if the observable has ended downstream." - where is it documented that isDisposed() checked if downstream was ended?Haley
Z
22

@Kiskae in previous comment correctly answered about the reason why such exception can occurs.

Here the link to official doc about this theme: RxJava2-wiki.

Sometimes you cannot change this behaviour so there is a way how to handle this UndeliverableException's. Here is code snippet of how to avoid crashes and misbehaviour:

RxJavaPlugins.setErrorHandler(e -> {
    if (e instanceof UndeliverableException) {
        e = e.getCause();
    }
    if ((e instanceof IOException) || (e instanceof SocketException)) {
        // fine, irrelevant network problem or API that throws on cancellation
        return;
    }
    if (e instanceof InterruptedException) {
        // fine, some blocking code was interrupted by a dispose call
        return;
    }
    if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) {
        // that's likely a bug in the application
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    if (e instanceof IllegalStateException) {
        // that's a bug in RxJava or in a custom operator
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    Log.warning("Undeliverable exception received, not sure what to do", e);
});

This code taken from the link above.

Important note. This approach sets global error handler to RxJava so if you can to get rid of these exceptions - it would be better option.

Zito answered 20/3, 2018 at 15:14 Comment(2)
I can't to get rid of these exception and I need some data from this RX but when this error happen I can't get my data from Json ..what should I do now?:(Revenge
When I make some network request inside of Observable.create my subscriber is not disposed at the beginning of the network call and is already disposed when I am getting response on the call. Is there a way to do not get InterruptedException in RxJavaPlugins.setErrorHandler?Contumacious
B
6

Kotlin

I call this in MainActivity onCreate method

private fun initRxErrorHandler(){
    RxJavaPlugins.setErrorHandler { throwable ->
        if (throwable is UndeliverableException) {
            throwable.cause?.let {
                Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), it)
                return@setErrorHandler
            }
        }
        if (throwable is IOException || throwable is SocketException) {
            // fine, irrelevant network problem or API that throws on cancellation
            return@setErrorHandler
        }
        if (throwable is InterruptedException) {
            // fine, some blocking code was interrupted by a dispose call
            return@setErrorHandler
        }
        if (throwable is NullPointerException || throwable is IllegalArgumentException) {
            // that's likely a bug in the application
            Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), throwable)
            return@setErrorHandler
        }
        if (throwable is IllegalStateException) {
            // that's a bug in RxJava or in a custom operator
            Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), throwable)
            return@setErrorHandler
        }
        Log.w("Undeliverable exception", throwable)
    }
}
Bendick answered 15/12, 2019 at 10:10 Comment(0)
C
0

While using observable.create() just go with tryOnError(). onError() doesn't guaranty that error will get handled. There are various error handling operators are there HERE

Commercial answered 6/11, 2019 at 8:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.