RxJava2 UndeliverableException when orientation change is happening while fetching data
Asked Answered
S

3

14

I get the following error, if I change the orientation of my device while my app is fetching new redditNews.

  E/AndroidRuntime: FATAL EXCEPTION: RxCachedThreadScheduler-1
    Process: com.spicywdev.schmeddit, PID: 26522
    io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | null
        at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
        at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:73)
        at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:43)
        at io.reactivex.Observable.subscribe(Observable.java:12090)
        at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
        at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
        at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
        at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
        at java.util.concurrent.FutureTask.run(FutureTask.java:237)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
        at java.lang.Thread.run(Thread.java:762)
     Caused by: java.io.InterruptedIOException
        at okhttp3.internal.http2.Http2Stream.waitForIo(Http2Stream.java:579)
        at okhttp3.internal.http2.Http2Stream.takeResponseHeaders(Http2Stream.java:143)
        at okhttp3.internal.http2.Http2Codec.readResponseHeaders(Http2Codec.java:125)
        at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:88)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:126)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200)
        at okhttp3.RealCall.execute(RealCall.java:77)
        at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)
        at retrofit2.ExecutorCallAdapterFactory$ExecutorCallbackCall.execute(ExecutorCallAdapterFactory.java:91)
        at com.spicywdev.schmeddit.features.news.NewsManager$getNews$1.subscribe(NewsManager.kt:16)
        at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)

That's what the class looks like. Error happens in function requestNews().

class NewsFragment : RxBaseFragment() {

    companion object {
        private val KEY_REDDIT_NEWS = "redditNews"
    }

    private var redditNews: RedditNews? = null
    private val newsManager by lazy { NewsManager() }

    override fun onCreateView(inflater: LayoutInflater, container: ViewGroup?, savedInstanceState: Bundle?): View? {
        return container?.inflate(R.layout.news_fragment)
    }

    override fun onActivityCreated(savedInstanceState: Bundle?) {
        super.onActivityCreated(savedInstanceState)

        news_list.apply {
            setHasFixedSize(true)
            val linearLayoutManager = LinearLayoutManager(context)
            layoutManager = linearLayoutManager
            clearOnScrollListeners()
            addOnScrollListener(InfiniteScrollListener({ requestNews()}, linearLayoutManager))
        }

        initAdapter()

        if (savedInstanceState != null && savedInstanceState.containsKey(KEY_REDDIT_NEWS)) {
            redditNews = savedInstanceState.get(KEY_REDDIT_NEWS) as RedditNews
            (news_list.adapter as NewsAdapter).clearAndAddNews(redditNews!!.news)
        } else {
            requestNews()
        }
    }

    override fun onSaveInstanceState(outState: Bundle) {
        super.onSaveInstanceState(outState)
        val news = (news_list.adapter as NewsAdapter).getNews()
        if (redditNews != null && news.isNotEmpty()) {
            outState.putParcelable(KEY_REDDIT_NEWS, redditNews?.copy(news = news))
        }
    }

    private fun requestNews() {
        /**
         * first time will send empty string for after parameter.
         * Next time we will have redditNews set with the next page to
         * navigate with the after param.
         */
        val subscription = newsManager.getNews(redditNews?.after ?: "")
            .subscribeOn(Schedulers.io())
            .subscribe(
                {
                    retrievedNews ->
                    redditNews = retrievedNews
                    (news_list.adapter as NewsAdapter).addNews(retrievedNews.news)
                },
                {
                    e -> Snackbar.make(news_list, e.message ?: "WZF", Snackbar.LENGTH_LONG).show()
                }
        )
        subscriptions.add(subscription)
    }


    private fun initAdapter() {
        if (news_list.adapter == null) {
            news_list.adapter = NewsAdapter()
        }
    }
}

I'm pretty new to RxJava - I'd be really glad if someone can help me with this. Thank you.

Sindee answered 3/10, 2018 at 16:20 Comment(8)
Have you read the exception message?Bharat
@Bharat Yes, I think I understand what the problem is. The exception doesn't reach it's destination. But I don't really understand how to fix it in the context of RxJava. :/Sindee
There is a link in the error message pointing to the wiki to read.Bharat
I tried to understand it, but for example I have no clue where to override the ErrorHandling of RxJava. Thank you for your helpSindee
There is an example of that in the wiki section: search for RxJavaPlugins.setErrorHandler.Bharat
I read that part, but I don't know how to implement it globally. Where do I have to put this code to get It working for all RxJava error cases?Sindee
That's a global handler, define it somewhere early in your application's lifecycle.Bharat
Unfortunately the crashes persist, but thank you nonetheless. Maybe I will figure it out in the future, when I'm a bit more familiar with Rx.Sindee
S
47

You can override Rx Error Handler with following way:

  1. Extends Application class with your custom MyApplication.
  2. Then write this in onCreate method of your application class:

    @Override
    public void onCreate() {
        super.onCreate();
        RxJavaPlugins.setErrorHandler(throwable -> {}); // nothing or some logging
    }
    
  3. Add MyApplication class to Manifest:

    <application
        android:name=".MyApplication"
        ...>
    

For more information, please take a look at RxJava Wiki

Slacks answered 3/10, 2018 at 18:4 Comment(1)
With Kotlin it is much simplier RxJavaPlugins.setErrorHandler {}Haukom
E
3

For catch this kind of errors that may raise by 3rd pary library or your own code use RxJavaPlugins and in the fun or ... that you used Rx write this code and tha uncatched rx error will be delivered to this code, so must check the exception type:

RxJavaPlugins.setErrorHandler {
        if( it is  UndeliverableException){
            Toast.makeText(view?.getContext(), "UndeliverableException: " + it.message, Toast.LENGTH_LONG).show()
            return@setErrorHandler
        }
    }

for the reference check this link: enter link description here

Episiotomy answered 12/2, 2020 at 8:38 Comment(0)
T
-6

May be (not tested), there is a possibility to change onError in the Observable with onNext(tag), tag making a difference between "normal onNext" and "error onNext" (replacing the on Error).

onNext(0) => is due to an Exception (you can get the Exception through a public static String)

onNext(1) => no error

emitter.onError(ex); => emitter.onNext(0);

and in the Observer :

@Override
public void onNext(Integer message) {
    if (message ==0 )
        handleError();
    else
        handleNext();
}

Of course, it's not very "clean", but it can solve the problem...

Tumblebug answered 21/11, 2018 at 7:33 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.