Rxandroid What's the difference between SubscribeOn and ObserveOn
Asked Answered
H

6

106

I am just learning Rx-java and Rxandroid2 and I am just confused what is the major difference between in SubscribeOn and ObserveOn.

Hemiplegia answered 8/7, 2017 at 9:41 Comment(0)
T
86

In case you find the above answer full of jargons:

tl;dr

 Observable.just("Some string")                 
           .map(str -> str.length())              
           .observeOn(Schedulers.computation())   
           .map(length -> 2 * length)   
           .observeOn(AndroidSchedulers.mainThread())
           .subscribeOn(Schedulers.io())
           .subscribe(---)

Observe an observable... perform the map function in an IO thread (since we are "subscribingOn" that thread)... now switch to a Computation Thread and perform map(length -> 2 * length) function... and finally make sure you Observe the output on (observeOn()) Main thread.

Anyway,

observeOn() simply changes the thread of all operators further Downstream. People usually have this misconception that observeOn also acts as upstream, but it doesn't.

The below example will explain it better...

Observable.just("Some string")                  // UI
       .map(str -> str.length())               // UI
       .observeOn(Schedulers.computation())   // Changing the thread
       .map(length -> 2 * length)            // Computation
       .subscribe(---)                       // Computation

subscribeOn() only influences the thread which is going to be used when Observable is going to get subscribed to and it will stay on it downstream.

Observable.just("Some String")              // Computation
  .map(str -> str.length())                // Computation
  .map(length -> 2 * length)              // Computation
  .subscribeOn(Schedulers.computation()) // -- changing the thread
  .subscribe(number -> Log.d("", "Number " + number)); // Computation

Position does not matter (subscribeOn())

Why? Because it affects only the time of subscription.

Methods that obey the contact with subscribeOn

-> Basic example : Observable.create

All the work specified inside the create body will run on the thread specified in subscribeOn.

Another example: Observable.just,Observable.from or Observable.range

Note: All those methods accept values, so do not use blocking methods to create those values, as subscribeOn won't affect it.

If you want to use blocking functions, use

Observable.defer(() -> Obervable.just(blockingMenthod())));

Important Fact:

subscribeOn does not work with Subjects

Multiple subscribeOn:

If there are multiple instances of subscribeOn in the stream, only the first one has a practical effect.

Subscribe & subscribeOn

People think that subscribeOn has something to do with Observable.subscribe, but it doesn't have anything special to do with it. It only affects the subscription phase.

Source : Tomek Polański (Medium)

Torrietorrin answered 29/3, 2018 at 11:35 Comment(8)
I'm confused, I thought I got it, but why does the answer below say that the position also matters in subscribeOn. So which one is true?Orthopter
@Orthopter both the answers are somewhat correct, the thing is position for subscribeOn matters if you are using multiple subscribeOn calls. When it comes to a single call, the position does not matter.Torrietorrin
if I do some UI work inside of the subscribe(), does that block my UI? I used subscribeOn(Schedulers.io()) but I am not sure. Also Observable.defer() blocks main thread?Apostles
@Apostles in order to perform UI task, you have to swift to Main Thread and if you'll perform serious computation on Main Thread then it will freeze the UI that's why we usually perform our task in any thread other than AndroidSchedulers.main() and take the result to the Main thread to JUST update the UI.Torrietorrin
@Apostles Talking about Observable.defer(), it has nothing to do with blocking the Main thread, that part is dependent upon the Scheduler you are using. The only thing .defer() does is it does not create the Observable until the observer subscribes and defer create a fresh observable for each observer.Torrietorrin
@Dennis Thank you so much for your answer. I have many questions about these things. For example if I do some UI work inside subscribe() method would that block the UI even though I subscribeOn Schedulers.io() and observeOn Schedulers.io() too? ThanksApostles
@Apostles you won't be able to perform any UI task on Schedulers.io() you have to switch to main thread. Otherwise it will throw an exception!Torrietorrin
@Dennis actually it does not throw an exception I think it works on Main Thread so subscribeOn() and observeOn() work for flatmap() and map() functions since those are followed by them.Apostles
M
137

SubscribeOn specify the Scheduler on which an Observable will operate. ObserveOn specify the Scheduler on which an observer will observe this Observable.

So basically SubscribeOn is mostly subscribed (executed) on a background thread ( you do not want to block the UI thread while waiting for the observable) and also in ObserveOn you want to observe the result on a main thread...

If you are familiar with AsyncTask then SubscribeOn is similar to doInBackground method and ObserveOn to onPostExecute...

Matrilineage answered 8/7, 2017 at 10:39 Comment(0)
T
86

In case you find the above answer full of jargons:

tl;dr

 Observable.just("Some string")                 
           .map(str -> str.length())              
           .observeOn(Schedulers.computation())   
           .map(length -> 2 * length)   
           .observeOn(AndroidSchedulers.mainThread())
           .subscribeOn(Schedulers.io())
           .subscribe(---)

Observe an observable... perform the map function in an IO thread (since we are "subscribingOn" that thread)... now switch to a Computation Thread and perform map(length -> 2 * length) function... and finally make sure you Observe the output on (observeOn()) Main thread.

Anyway,

observeOn() simply changes the thread of all operators further Downstream. People usually have this misconception that observeOn also acts as upstream, but it doesn't.

The below example will explain it better...

Observable.just("Some string")                  // UI
       .map(str -> str.length())               // UI
       .observeOn(Schedulers.computation())   // Changing the thread
       .map(length -> 2 * length)            // Computation
       .subscribe(---)                       // Computation

subscribeOn() only influences the thread which is going to be used when Observable is going to get subscribed to and it will stay on it downstream.

Observable.just("Some String")              // Computation
  .map(str -> str.length())                // Computation
  .map(length -> 2 * length)              // Computation
  .subscribeOn(Schedulers.computation()) // -- changing the thread
  .subscribe(number -> Log.d("", "Number " + number)); // Computation

Position does not matter (subscribeOn())

Why? Because it affects only the time of subscription.

Methods that obey the contact with subscribeOn

-> Basic example : Observable.create

All the work specified inside the create body will run on the thread specified in subscribeOn.

Another example: Observable.just,Observable.from or Observable.range

Note: All those methods accept values, so do not use blocking methods to create those values, as subscribeOn won't affect it.

If you want to use blocking functions, use

Observable.defer(() -> Obervable.just(blockingMenthod())));

Important Fact:

subscribeOn does not work with Subjects

Multiple subscribeOn:

If there are multiple instances of subscribeOn in the stream, only the first one has a practical effect.

Subscribe & subscribeOn

People think that subscribeOn has something to do with Observable.subscribe, but it doesn't have anything special to do with it. It only affects the subscription phase.

Source : Tomek Polański (Medium)

Torrietorrin answered 29/3, 2018 at 11:35 Comment(8)
I'm confused, I thought I got it, but why does the answer below say that the position also matters in subscribeOn. So which one is true?Orthopter
@Orthopter both the answers are somewhat correct, the thing is position for subscribeOn matters if you are using multiple subscribeOn calls. When it comes to a single call, the position does not matter.Torrietorrin
if I do some UI work inside of the subscribe(), does that block my UI? I used subscribeOn(Schedulers.io()) but I am not sure. Also Observable.defer() blocks main thread?Apostles
@Apostles in order to perform UI task, you have to swift to Main Thread and if you'll perform serious computation on Main Thread then it will freeze the UI that's why we usually perform our task in any thread other than AndroidSchedulers.main() and take the result to the Main thread to JUST update the UI.Torrietorrin
@Apostles Talking about Observable.defer(), it has nothing to do with blocking the Main thread, that part is dependent upon the Scheduler you are using. The only thing .defer() does is it does not create the Observable until the observer subscribes and defer create a fresh observable for each observer.Torrietorrin
@Dennis Thank you so much for your answer. I have many questions about these things. For example if I do some UI work inside subscribe() method would that block the UI even though I subscribeOn Schedulers.io() and observeOn Schedulers.io() too? ThanksApostles
@Apostles you won't be able to perform any UI task on Schedulers.io() you have to switch to main thread. Otherwise it will throw an exception!Torrietorrin
@Dennis actually it does not throw an exception I think it works on Main Thread so subscribeOn() and observeOn() work for flatmap() and map() functions since those are followed by them.Apostles
A
73

Summary

  • Use observeOn to set threads for callbacks "further down the stream (below it)", such as code blocks inside doOnNext or map.
  • Use subscribeOn to set threads for initializations "upstream (above it)", such as doOnSubscribe, Observable.just or Observable.create.
  • Both methods can be called multiple times, with each call overwriting previous ones. Position matters.

Let's walk through this topic with an example: we want to find the length of the string "user1032613". This is not an easy task for computers, so it's only natural that we perform the intense calculation in a background thread, to avoid freezing the app.

observeOn

We can call observeOn as many times as we like, and it controls which thread all callbacks below it will run. It's easy to use, and works just as you'd expect.

For example, we will show a progress bar on the main UI thread, then do intensive/blocking operations in another thread, then come back to the main UI thread to update the result:

    Observable.just("user1032613")

            .observeOn(mainThread) // set thread for operation 1
            .doOnNext {
                /* operation 1 */
                print("display progress bar")
                progressBar.visibility = View.VISIBLE
            }

            .observeOn(backThread) // set thread for operation 2 and 3
            .map {
                /* operation 2 */
                print("calculating")
                Thread.sleep(5000)
                it.length
            }

            .doOnNext {
                /* operation 3 */
                print("finished calculating")
            }

            .observeOn(mainThread) // set thread for operation 4
            .doOnNext {
                /* operation 4 */
                print("hide progress bar and display result")
                progressBar.visibility = View.GONE
                resultTextView.text = "There're $it characters!"
            }

            .subscribe()

In the above example, /* operation 1 */ is ran in the mainThread because we set it using observeOn(mainThread) on the line right above it; then we switch to backThread by calling observeOn again, so /* operation 2 */ will run there. Because we didn't change it before chaining /* operation 3 */, it will run in the back thread as well, just like /* operation 2 */; finally we call observeOn(mainThread) again, to make sure /* operation 4 */ updates the UI from the main thread.

subscribeOn

So we've learned observeOn sets threads for subsequent callbacks. What else are we missing? Well, the Observable itself, and its methods such as just(), create(), subscribe() and so on, are also code that needs to be executed. This is how objects are passed along the stream. We use subscribeOn to set threads for code related to Observable itself.

If we remove all the callbacks (controlled by observeOn discussed earlier), we are left with the "skeleton code" that will, by default, run on whichever thread the code is written in (probably main thread):

    Observable.just("user1032613")
            .observeOn(mainThread)
            .doOnNext {
            }
            .observeOn(backThread)
            .map {
            }
            .doOnNext {
            }
            .observeOn(mainThread)
            .doOnNext {
            }
            .subscribe()

If we aren't happy about this empty skeleton code running on main thread, we can use subscribeOn to change it. For example, maybe the first line Observable.just("user1032613") isn't as simple as creating a stream from my user name - maybe it's a string from the Internet, or perhaps you are using doOnSubscribe for some other intensive operations. In that case, you can call subscribeOn(backThread) to put some of the code in another thread.

Where to put subscribeOn

At the time of writing this answer, there are some misconceptions saying "only call it once", "position does not matter", and "if you call it multiple times, only the first time counts". After lots of researches and experiments, it turns out subscribeOn can be usefully called multiple times.

Because Observable uses Builder Pattern (fancy name for "chaining methods one after another"), subscribeOn is applied in reverse order. Therefore, this method sets the thread for code above it, exactly the opposite of observeOn.

We can experiment this using doOnSubscribe method. This method is triggered on the subscription event, and it runs on the thread set by subscribeOn:

    Observable.just("user1032613")
            .doOnSubscribe {
                print("#3 running on main thread")
            }
            .subscribeOn(mainThread) // set thread for #3 and just()
            .doOnNext {
            }
            .map {
            }
            .doOnSubscribe {
                print("#2 running on back thread")
            }
            .doOnNext {
            }
            .subscribeOn(backThread) // set thread for #2 above
            .doOnNext {
            }
            .doOnSubscribe {
                print("#1 running on default thread")
            }
            .subscribe()

It might be easier to follow the logic, if you read the above example from bottom to top, just like how Builder Pattern executes the code.

In this example, the first line Observable.just("user1032613") is run in the same thread as print("#3") because there are no more subscribeOn in-between them. This creates the illusion of "only the first call matters" for people who only care about code inside just() or create(). This quickly falls apart once you start doing more.


Footnote:

Threads and print() functions in the examples are defined, for brevity, as follows:

val mainThread = AndroidSchedulers.mainThread()
val backThread = Schedulers.computation()
private fun print(msg: String) = Log.i("", "${Thread.currentThread().name}: $msg")
Austreng answered 21/11, 2018 at 1:46 Comment(5)
I was the one who mentioned that "subscribeOn() works only once" in my answer(of the same question). Looking at your explanation seems like it does make sense. Can I use a bit of your explanation to update my answer?Torrietorrin
thanks. a follow up question, if observeOn controls code below it, and subscribeOn controls code above it, which thread would a code block follow if it has observeOn above it and subscribeOn below it? Sorry just a bit confusedOrthopter
@Orthopter subscribeOn sets thread for the framework "skeleton" itself, and observeOn sets thread for the callback functions/code blocks. They are not controlling the same thing. In your case, a "code block" runs on whichever thread observeOn set it to be.Austreng
Great response with example code. For completeness, can you add in code in subscribe() and explain which thread it runs on and how subscribeOn and observeOn affect it?Mondragon
this is not the behavior i see for rxjava 3Solstice
L
22

If someone finds rx java description hard to understand (as me for example), here is pure java explanation:

subscribeOn()

Observable.just("something")
  .subscribeOn(Schedulers.newThread())
  .subscribe(...);

Is equivalent of:

Observable observable = Observable.just("something");
new Thread(() -> observable.subscribe(...)).start();

Because Observable emits values on subscribe() and here subscribe() goes in the separate thread, the values are also emitted in the same thread as subscribe(). This is why it works "upstream" (influences the thread for the previous operations) and "downstream".

observeOn()

Observable.just("something")
  .observeOn(Schedulers.newThread())
  .subscribe(...);

Is equivalent of:

Observable observable = Observable.just("something")
  .subscribe(it -> new Thread(() -> ...).start());

Here Observable emits values in the main thread, only the listener method is executed in the separate thread.

Lowpitched answered 22/5, 2019 at 18:7 Comment(0)
H
4

When you subscribe to an observable, a flow starts that works its way up to the top of chain and then back down again. The subscribe part is relevant to the upward chaining and the observe part is relevant to the downward chaining.

Once the top of the chain is reached, the subscription phase has essentially completed. Events start to be emitted and the downward chain of maps, filters etc are invoked.

SubscribeOn influences subscription calls above its placement, for example doOnSubscribe.

ObserveOn influences observation calls below its placement, for example, doOnNext, map, flatmap etc.

Both will change the thread that is used to continue the flow either upward or downward.

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.CountDownLatch;

public class SubscribeVsObserveOn {

    public static void main(String[] args) throws InterruptedException {

        System.out.println("Ordinal 0: " + Thread.currentThread().getName());

        final CountDownLatch latch = new CountDownLatch(1);

        Observable
            .just("a regular string.")
            .doOnSubscribe(disposable ->
                System.out.println("Ordinal 2: " + Thread.currentThread().getName()))
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread())
            .doOnNext(s ->
                System.out.println("Ordinal 3: " + Thread.currentThread().getName()))
            .map(s -> s)
            .doOnSubscribe(disposable ->
                System.out.println("Ordinal 1: " + Thread.currentThread().getName()))
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread())
            .doOnNext(s ->
                System.out.println("Ordinal 4: " + Thread.currentThread().getName()))
            .map(s -> s)
            .subscribe(s -> latch.countDown());

        latch.await();
    }
}

Here is the output:

Ordinal 0: main
Ordinal 1: RxNewThreadScheduler-1
Ordinal 2: RxNewThreadScheduler-2
Ordinal 3: RxNewThreadScheduler-3
Ordinal 4: RxNewThreadScheduler-4
Hanni answered 27/7, 2021 at 13:34 Comment(0)
E
3

This answer is nothing new, I just want to clarify a little bit more.

  1. Let's assume that we have two threads.

     val pool1 = Executors.newCachedThreadPool { runnable -> Thread(runnable, "Thread 1") }
     val pool2 = Executors.newCachedThreadPool { runnable -> Thread(runnable, "Thread 2") }
    

  1. As the answers described, observeOn will set Downstream, and subscribeOn will set Upstream. But what if both of them was used? For check this, I added logs line by line.

    Observable.just("what if use both")
     .doOnSubscribe { Log.d("Thread", "both, doOnSubscribe A " + Thread.currentThread().name) }
     .doOnNext { Log.d("Thread", "both, doOnNext A " + Thread.currentThread().name) }
     .map {
         Log.d("Thread", "both, map A " + Thread.currentThread().name)
         it + " A"
     }
    
     // observeOn
     .observeOn(Schedulers.from(pool1)) 
    
     .doOnSubscribe { Log.d("Thread", "both, doOnSubscribe B " + Thread.currentThread().name) }
     .doOnNext { Log.d("Thread", "both, doOnNext B " + Thread.currentThread().name) }
     .map {
         Log.d("Thread", "both, map B " + Thread.currentThread().name)
         it + " B"
     }
    
     // subscribeOn
     .subscribeOn(Schedulers.from(pool2)) 
    
     .doOnSubscribe { Log.d("Thread", "both, doOnSubscribe C " + Thread.currentThread().name) }
     .doOnNext { Log.d("Thread", "both, doOnNext C " + Thread.currentThread().name) }
     .map {
        Log.d("Thread", "both, map C " + Thread.currentThread().name)
        it + " C"
      }
    
     // observeOn main
     .observeOn(AndroidSchedulers.mainThread())
     .doOnNext { Log.d("Thread", "main " + Thread.currentThread().name) }
     .subscribe(
         { result -> Log.d("Thread", "main subscribe " + Thread.currentThread().name)}
         , { error -> {} }
         )
    

The result is:

both, doOnSubscribe C main
both, doOnSubscribe A Thread 2
both, doOnSubscribe B Thread 2

both, doOnNext A Thread 2
both, map A Thread 2

both, doOnNext B Thread 1
both, map B Thread 1

both, doOnNext C Thread 1
both, map C Thread 1

main main
main subscribe main
result: what if use both A B C

As you can see, doOnSubscribe called first, from bottom to top. That means subscribe has priority over other operators, so the first thread which handles first code was Thread 2.

And then other operators was called, line by line. After observeOn, thread was changed to Thread 1. Then, just before calling subscribe, observeOn was called again, for change thread to main thread. (Don't care about AndroidSchedulers, it is just a kind of scheduler)

TL;DR;

  • First path, subscribeOn called first, from bottom to top.
  • Second path, observeOn called, from top to bottom, along with other codes.
  • Behavior was same on both RxJava2 and RxJava3
Exosmosis answered 2/10, 2020 at 6:57 Comment(1)
"doOnSubscribe called first, from bottom to top." -> the doOnSubscribe output you show is C -> A -> B. But if "bottom to top" is accurate shouldn't the output be C -> B -> A since that's the bottom-to-top order in the code?Lavena

© 2022 - 2024 — McMap. All rights reserved.