Switching threads multiple times in Rx chain
Asked Answered
D

2

15

Let's assume I have a following case for Android:

  1. Request list of groups from network
  2. Show some UI elements for each group
  3. Request items for each group
  4. Show UI elemets for each item

I want to do this using RxJava:

webService.requestGroups()
        .flatMap(group -> {
            view.showGroup(group);
            return webService.requestItems(group);
        })
        .toList()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())

        .subscribe(items -> view.showItems(items));

As you can see I have 2 calls for view objects, each of them must be executed on main thread. And 2 calls for webService, which must be executed on background thread.

The problem with this code: first call to view will be executed on background which cause an Android RuntimeException (Only original thread may touch views or something) If I transfer .observeOn to the beginning of chain - second webService call will be executed in main thread.

How can I "swim" through threads multiple times in RxJava chain?

Domesticate answered 6/9, 2017 at 8:23 Comment(0)
J
19

From the Rx doc for SubscribeOn:

The SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called. ObserveOn, on the other hand, affects the thread that the Observable will use below where that operator appears. For this reason, you may call ObserveOn multiple times at various points during the chain of Observable operators in order to change on which threads certain of those operators operate.

The SubscribeOn operator can only be applied once and sets the starting thread. ObserveOn can be used to go from one thread to another at any point in the stream. So I think the following should do what you want:

webService.requestGroups()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap(group -> {
        view.showGroup(group);
        return webService.requestItems(group)
                         .subscribeOn(Schedulers.io());
    })
    .toList()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(items -> view.showItems(items));

But in my opinion this is too complicated. I would just subscribe to the first observable, and then start a new chain for each group, like this:

webService.requestGroups()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(group -> { 
        view.showGroup(group);
        webService.requestItems(group)
            .subscribeOn(Schedulers.io()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(items -> view.showItems(items));
    });
Joacima answered 6/9, 2017 at 8:52 Comment(2)
When would we call observeOn before calling subscribeOn?Incubator
@Incubator position of the subscribeOn() does not matter in the chain -- the first subscribeOn() will be the thread that the entire chain uses for starting the subscription (whether it's the first chained method call, or the last). In contrast, observeOn() changes the thread for everything that comes after that point in the chain. But note that in Samuel's 2nd code block, there are two completely separate Rx chains happening. The requestItems() chain is not mapped into the requestGroups() chain.Breazeale
S
14

Building on Samuel's answer, you could do it with an even simpler, non-nested syntax:

webService.requestGroups()
.subscribeOn(Schedulers.io()) // the first operator (requestGroups) on the IO thread
.observeOn(AndroidSchedulers.mainThread()) //everything below on the main thread
.map(group -> {
    view.showGroup(group);
    return group;
})
.observeOn(Schedulers.io()) //everything below on the IO thread
.flatMap(group -> {
    return webService.requestItems(group);
})
.toList()
.observeOn(AndroidSchedulers.mainThread()) //everything below on the main thread
.subscribe(items -> view.showItems(items));

Two rules of thumb here:

  1. subscribeOn dictates on which thread the observable will begin executing, its placement in the chain is irrelevant and it should appear only once.
  2. observeOn tells on which thread all subsequent operators will execute (until another observeOn is encountered); it may appear multiple times in the chain, changing execution thread of different code pieces (like in the example above).
Shankle answered 18/3, 2019 at 17:57 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.