RxJava: observeOn, subscribeOn, and doFinally, switching between IO and UI thread
Asked Answered
B

3

5

I am running into an issue where my observable is subscribed on an IO thread and observed on the android main (UI) thread but the doFinally operator is run on the IO thread and it needs to be run on the UI thread.

The usecase is almost exactly the same as this medium article.

I essentially want to show a ProgressBar when the Observable is subscribed to and hide the ProgressBar when the Observable is terminated or finished.

The error I am getting is: java.lang.IllegalStateException: The current thread must have a looper!

Can anyone help me move the doFinally action back to the UI thread which has a looper? Or am I missing some other piece of information?

EDIT The usecase workflow is:

-> Launch Activity

-> initialize

-> execute observable stream

-> Start new Activity and finish current activity

-> New activity

-> Start original activity and finish

-> repeat initialize

Thank you very much.

Details:

  • RxJava 2.0.7
  • RxAndroid 2.0.1
  • Android sdk min 14 and target 25

Example Code

listUseCase.execute(null)
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(@NonNull Disposable disposable) throws Exception {
                    getView().showLoading(true);
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    getView().showLoading(false);
                }
            })
            .subscribeOn(schedulerProvider.io())
            .observeOn(schedulerProvider.main())
            .subscribe(
                    new Consumer<List<AccountEntity>>() {
                        @Override
                        public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                            getView().setAccounts(accountEntities);
                        }
                    },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            if (isViewAttached()) {
                                getView().showError(throwable.getMessage());
                            }
                        }
                    }
            );

Stack Trace:

FATAL EXCEPTION: RxCachedThreadScheduler-1
  Process: com.example.android.demo.customerfirst.alpha, PID: 16685
  java.lang.IllegalStateException: The current thread must have a looper!
      at android.view.Choreographer$1.initialValue(Choreographer.java:96)
      at android.view.Choreographer$1.initialValue(Choreographer.java:91)
      at java.lang.ThreadLocal$Values.getAfterMiss(ThreadLocal.java:430)
      at java.lang.ThreadLocal.get(ThreadLocal.java:65)
      at android.view.Choreographer.getInstance(Choreographer.java:192)
      at android.animation.ValueAnimator$AnimationHandler.<init>(ValueAnimator.java:600)
      at android.animation.ValueAnimator$AnimationHandler.<init>(ValueAnimator.java:575)
      at android.animation.ValueAnimator.getOrCreateAnimationHandler(ValueAnimator.java:1366)
      at android.animation.ValueAnimator.end(ValueAnimator.java:998)
      at android.graphics.drawable.AnimatedVectorDrawable.stop(AnimatedVectorDrawable.java:439)
      at android.widget.ProgressBar.stopAnimation(ProgressBar.java:1523)
      at android.widget.ProgressBar.onVisibilityChanged(ProgressBar.java:1583)
      at android.view.View.dispatchVisibilityChanged(View.java:8643)
      at android.view.View.setFlags(View.java:9686)
      at android.view.View.setVisibility(View.java:6663)
      at android.widget.ProgressBar.setVisibility(ProgressBar.java:1563)
      at com.example.android.demo.customerfirst.featuresstore.list.ProductListActivity.showLoading(ProductListActivity.java:121)
      at com.example.android.demo.customerfirst.featuresstore.list.ProductListPresenterMediator$3.run(ProductListPresenterMediator.java:56)
      at io.reactivex.internal.operators.observable.ObservableDoFinally$DoFinallyObserver.runFinally(ObservableDoFinally.java:144)
      at io.reactivex.internal.operators.observable.ObservableDoFinally$DoFinallyObserver.onComplete(ObservableDoFinally.java:94)
      at io.reactivex.internal.observers.DisposableLambdaObserver.onComplete(DisposableLambdaObserver.java:73)
      at io.reactivex.internal.observers.DeferredScalarDisposable.complete(DeferredScalarDisposable.java:84)
      at io.reactivex.internal.operators.observable.ObservableFromCallable.subscribeActual(ObservableFromCallable.java:52)
      at io.reactivex.Observable.subscribe(Observable.java:10700)
      at io.reactivex.internal.operators.observable.ObservableDoOnLifecycle.subscribeActual(ObservableDoOnLifecycle.java:33)
      at io.reactivex.Observable.subscribe(Observable.java:10700)
      at io.reactivex.internal.operators.observable.ObservableDoFinally.subscribeActual(ObservableDoFinally.java:45)
      at io.reactivex.Observable.subscribe(Observable.java:10700)
      at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
      at io.reactivex.Scheduler$1.run(Scheduler.java:138)
      at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
      at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
      at java.util.concurrent.FutureTask.run(FutureTask.java:237)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
      at java.lang.Thread.run(Thread.java:818)
Bittencourt answered 28/4, 2017 at 14:28 Comment(0)
B
4

The issue was occurring because I was not disposing the subscription when an activity was finished/destroyed.

Now each activity/view tells the presenter when they are stopped or destroyed and the presenter disposes of the subscription.

This appears to have solved my issue.

 @Override
public void initialize() {
    if (!isViewAttached()) {
        throw new ViewNotAttachedException();
    }
    disposable = listUseCase.execute(null)
            .subscribeOn(schedulerProvider.io()) // Move subscribe on here
            .observeOn(schedulerProvider.main()) // Change threads here
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(@NonNull Disposable disposable) throws Exception {
                    getView().showLoading(true); // This should be on the main thread also
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    getView().showLoading(false);
                }
            })
            .subscribe(
                    new Consumer<List<AccountEntity>>() {
                        @Override
                        public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                            getView().setAccounts(accountEntities);
                        }
                    },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            if (isViewAttached()) {
                                getView().showError(throwable.getMessage());
                            }
                        }
                    }
            );
}

@Override
public void dispose() {
    if (disposable != null) {
        disposable.dispose();
    }
}
Bittencourt answered 28/4, 2017 at 15:16 Comment(0)
C
3

All you need to do is move the observeOn up the chain. The observeOn method changes the thread that onNext, onError, and onCompleted are called on which is internally how operations & side effects work (via lift)

listUseCase.execute(null)
            .subscribeOn(schedulerProvider.io()) // Move subscribe on here
            .observeOn(schedulerProvider.main()) // Change threads here
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(@NonNull Disposable disposable) throws Exception {
                    getView().showLoading(true); // This should be on the main thread also
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    getView().showLoading(false);
                }
            })

            .subscribe(
                    new Consumer<List<AccountEntity>>() {
                        @Override
                        public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                            getView().setAccounts(accountEntities);
                        }
                    },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            if (isViewAttached()) {
                                getView().showError(throwable.getMessage());
                            }
                        }
                    }
            );
Calyptrogen answered 28/4, 2017 at 14:34 Comment(6)
The issue still occurs with change. I will add more information to the original ticket as maybe I am doing something wrong outside of this observable usecase.Bittencourt
Looks at the error stack trace (or post it) it should tell you where which line is calling on the wrong thread.Calyptrogen
@cryoxis I posted the stack trace. I feel like the action is coming back on a disposed thread that doesn't have a looper. I am not doing any disposing actions so I will go and update my activities/views and their presenters to dispose of the observable when they are stopped/paused/destroyed and get back to you. Thank you for the insight into the lifting mechanism. I'll reply with how it goes.Bittencourt
@Bittencourt You didn't reply. :)Pernicious
@Pernicious I had posted my own answer with what my problem and fix was. Is that not the correct thing to do?Bittencourt
@Bittencourt I didn't see the answer was also yours, oops. Ok, thanks, looking into it.Pernicious
B
3

Maybe it is too late. But check the documentation here about the doFinally They say explicitly"

Calls the specified action after this Observable signals onError or onCompleted or gets disposed by the downstream.

That means that doFinally it will be called anyway, and it is not warranted that you have a valid context.

I do not use doFinally for this things. I set the loading(false) onNext or onError always. Not pretty but effective.

Baudekin answered 7/1, 2019 at 11:48 Comment(1)
Yes. There is no guarantee doFinally would be called in the thread specified in observeOn. I personally prefer for such cases doAfterTerminate which is described in the docs as "to be called after this Single invokes either onSuccess or onError" that perfectly fits for me when working with LiveData.Tonietonight

© 2022 - 2024 — McMap. All rights reserved.