The following case specify all the different cases that can arise while using observeOn() and/Or subscribeOn().
subscribeOn
affects upstream operators (operators above the subscribeOn)
observeOn
affects downstream operators (operators below the observeOn)
- If you don’t specify threading in RxJava (if you don’t specify
subscribeOn
, observeOn
or both), the data will be emitted and processed by the current scheduler/thread (usually the main thread). For instance, all operators in the chain below will be processed by the current thread( Main thread in case of Android).
Observable
.just("big", "bigger", "biggest")
.map(String::length)
.filter { it > 6 }
.subscribe { length -> println("item length $length") }
- If only subscribeOn is specified, all operators will be be executed on that thread
Observable
.just("big", "bigger", "biggest")
.subscribeOn(Schedulers.io())
.map(String::length)
.filter { it > 6 }
.subscribe { length -> println("item length $length") }
Data emission just , the map and filter operator will be executed on the io scheduler as directed by the upstream operator subscribeOn
.
- If only
observeOn
is specified, all operators will be executed on the current thread and only operators below the observeOn
will be switched to thread specified by the observeOn
Observable
.just("big", "bigger", "biggest")
.map(String::length)
.observeOn(Schedulers.computation())
.filter { it > 6 }
.subscribe { length -> println("item length $length") }
Data emission just and the map will be executed on the currentThread scheduler .
filter will be executed on the computation scheduler as directed by the downstream operator observeOn
.
- if both
subscribeOn
and observeOn
are specified, then all the operators below the observeOn
will be switched to thread specified by the observeOn
and rest all operators above observeOn
are switched to thread specified by subscribeOn
. This holds true in whatever order you specify subscribeOn
and observeOn
Observable
.just("big", "bigger", "biggest")
.subscribeOn(Schedulers.io())
.map(String::length)
.observeOn(Schedulers.computation())
.filter { it > 6 }
.subscribe { length -> println("item length $length") }
Data emission just and the map operator will be executed on the io scheduler as directed by the upstream operator subscribeOn
.
filter will be executed on the computation scheduler as directed by the downstream operator observeOn
.
The thread usage would be the same even if subscribeOn would have been called after observeOn.
Observable
.just("big", "bigger", "biggest")
.map(String::length)
.observeOn(Schedulers.computation())
.filter { it > 6 }
.subscribeOn(Schedulers.io())
.subscribe { length -> println("item length $length") }
Observer observe Observable and Observable subscribe Observer
this is incorrect. Rx project's official page says "an observer .. must first subscribe to that Observable". – Plication