Rxjava observeOn and subscribeOn in Retrofit
Asked Answered
H

4

9

observeOn: This method simply changes the thread of all operators further downstream (https://medium.com/upday-devs/rxjava-subscribeon-vs-observeon-9af518ded53a)

When calling API, I want to run the communicating with a server on IO thread and want to handle the result on mainThread.

I see the below code in many tutorials and no doubt it is correct. but my understanding is opposite so I'd like to know what I'm misunderstanding.

requestInterface.callApi()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .subscribe())

observeOn(AndroidSchedulers.mainThread())

: observeOn changes the thread of all operators further downstream, but in the example, the actual calling API function is upper than observeOn?

.subscribeOn(Schedulers.io())

: Weird part, it needs to be subscribed on main-thread, but subscribe on IO thread?

Please advise me What I'm misunderstanding?

Hollandia answered 5/12, 2017 at 2:58 Comment(0)
N
6

Basic, we will have

Observable.subscribe(Observer);// => Observer observe Observable and Observable subscribe Observer

Example

requestInterface.callApi().subscribe(new Observer...); // requestInterface.callApi() <=> Observable

From the http://reactivex.io/documentation/operators/subscribeon.html

SubscribeOn

  • SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called

ObserveOn (affect 2 things)

  • It instructs the Observable to send notifications to Observers on a specified Scheduler.

  • ObserveOn affects the thread that the Observable will use below where that operator appears

Example

registerUserReturnedObserverble()  // run on worker thread because subscribeOn(Schedulers.io()) (line 5)
.andThen(loginReturnObserverble()) // run on worker thread because subscribeOn(Schedulers.io()) (line 5)
.observeOn(AndroidSchedulers.mainThread())
.andThen(getUserDataReturnObserverble()) // run on main thread because .observeOn(AndroidSchedulers.mainThread()) is above this operator (line 3)
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Void>{
    // run on main thread because observeOn(AndroidSchedulers.mainThread()) 
});
News answered 5/12, 2017 at 3:32 Comment(1)
Observer observe Observable and Observable subscribe Observer this is incorrect. Rx project's official page says "an observer .. must first subscribe to that Observable".Plication
N
1
  • subscribeOn(Schedulers.io()): This tell the Observable to run the task on a background thread
  • observeOn(AndroidSchedulers.mainThread()): This tells the Observer to receive the data on android UI thread so that you can take any UI related actions.
Nicoline answered 25/4, 2018 at 7:2 Comment(0)
G
1

The following case specify all the different cases that can arise while using observeOn() and/Or subscribeOn().

  1. subscribeOn affects upstream operators (operators above the subscribeOn)
  2. observeOn affects downstream operators (operators below the observeOn)
  3. If you don’t specify threading in RxJava (if you don’t specify subscribeOn, observeOn or both), the data will be emitted and processed by the current scheduler/thread (usually the main thread). For instance, all operators in the chain below will be processed by the current thread( Main thread in case of Android).
Observable
.just("big", "bigger", "biggest")    
.map(String::length)    
.filter { it > 6 }    
.subscribe { length -> println("item length $length") }
  1. If only subscribeOn is specified, all operators will be be executed on that thread
Observable
.just("big", "bigger", "biggest") 
.subscribeOn(Schedulers.io())    
.map(String::length)    
.filter { it > 6 }    
.subscribe { length -> println("item length $length") }

Data emission just , the map and filter operator will be executed on the io scheduler as directed by the upstream operator subscribeOn.

  1. If only observeOn is specified, all operators will be executed on the current thread and only operators below the observeOn will be switched to thread specified by the observeOn
Observable
.just("big", "bigger", "biggest")   
.map(String::length)
.observeOn(Schedulers.computation())     
.filter { it > 6 }    
.subscribe { length -> println("item length $length") }

Data emission just and the map will be executed on the currentThread scheduler .

filter will be executed on the computation scheduler as directed by the downstream operator observeOn.

  1. if both subscribeOn and observeOn are specified, then all the operators below the observeOn will be switched to thread specified by the observeOn and rest all operators above observeOn are switched to thread specified by subscribeOn . This holds true in whatever order you specify subscribeOn and observeOn
Observable
.just("big", "bigger", "biggest")   
.subscribeOn(Schedulers.io()) 
.map(String::length)
.observeOn(Schedulers.computation())     
.filter { it > 6 }    
.subscribe { length -> println("item length $length") }

Data emission just and the map operator will be executed on the io scheduler as directed by the upstream operator subscribeOn.

filter will be executed on the computation scheduler as directed by the downstream operator observeOn.

The thread usage would be the same even if subscribeOn would have been called after observeOn.

Observable
.just("big", "bigger", "biggest")  
.map(String::length)
.observeOn(Schedulers.computation()) 
.filter { it > 6 }   
.subscribeOn(Schedulers.io()) 
.subscribe { length -> println("item length $length") }
Goltz answered 14/5, 2020 at 16:13 Comment(1)
subscribeOn affects upstream operators (operators above the subscribeOn) => I think subscribeOn affects all operators in the chain(if there is no obseveOn), no only the 'upstream' operators?Naturally
M
0

Here is a sample:

      getCardsObservable.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new rx.Observer<List<Card>>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {
                    listener.onError(e.getMessage());
                }

                @Override
                public void onNext(List<Card> cards) {
                    listener.onSuccess(cards);
                }
            });

subscribeOn --> thread where the call will be executed similar to calling an asynctask

observeOn --> where the responses will be observed a process the UI thread

subscribe --> the observer callbacks

Monson answered 25/4, 2018 at 6:53 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.