RxJava `Completable.andThen` is not executing serially?
Asked Answered
E

2

30

I have a usecase where I initiallize some global variables in a Completable , and in the next step in the chain (using andThen operator) I make use of those variables.

Following sample explains my usecase in detail

Say you have a class User

        class User {
            String name;
        }

and I have an Observable like this ,

        private User mUser; // this is a global variable

        public Observable<String> stringObservable() {
            return Completable.fromAction(() -> {
                mUser = new User();
                mUser.name = "Name";
            }).andThen(Observable.just(mUser.name));
        }           

First I'm doing some initiallizations in my Completable.fromAction and I expect andThen operator to start only after completing the Completable.fromAction.

Which means I expect mUser to be initallized when the andThen operator starts.

Following is my subscription to this observable

             stringObservable()
            .subscribe(s -> Log.d(TAG, "success: " + s),
                    throwable -> Log.e(TAG, "error: " + throwable.getMessage()));

But when I run this code , I get an error

          Attempt to read from field 'java.lang.String User.name' on a null object reference

which means mUser is null , andThen started before executing the code in Completable.fromAction. Whats happening here?

According to documentation of andThen

Returns an Observable which will subscribe to this Completable and once that is completed then will subscribe to the {@code next} ObservableSource. An error event from this Completable will be propagated to the downstream subscriber and will result in skipping the subscription of the Observable.

Eyebolt answered 5/2, 2018 at 14:32 Comment(0)
S
57

The issue is not with andThen but with the statement Observable.just(mUser.name) inside andThen . The just operator will try to create the observable immediately though it will emit only after Completable.fromAction.

Problem here is , while trying to create the Observable using just , the mUser is null.

Solution : You need to defer the creation of the String Observable till a subscription happens , till the upstream of andThen starts emission.

Instead of andThen(Observable.just(mUser.name));

use

 andThen(Observable.defer(() -> Observable.just(mUser.name)));

Or

 andThen(Observable.fromCallable(() -> mUser.name));
Sarina answered 5/2, 2018 at 14:35 Comment(3)
Observable.just(T) docs states: "Note that the item is taken and re-emitted as is and not computed by any means by just. Use fromCallable(Callable) to generate a single item on demand (when Observers subscribe to it)."Neoteric
Yes. the same is mentioned in the answerSarina
Quoting the docs directly in the answer is a nice one to have, howeverJustifiable
K
7

I don't think @Sarath Kn's answer is 100% correct. Yes just will create observable as soon as it's called, but andThen is still calling just at an unexpected time.

We can compare andThen with flatMap to get some better understanding. Here is a fully runnable test:

package com.example;

import org.junit.Test;

import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;

public class ExampleTest {

    @Test
    public void createsIntermediateObservable_AfterSubscribing() {
        Observable<String> coldObservable = getObservableSource()
                .flatMap(integer -> getIntermediateObservable())
                .subscribeOn(Schedulers.trampoline())
                .observeOn(Schedulers.trampoline());
        System.out.println("Cold obs created... subscribing");
        TestObserver<String> testObserver = coldObservable.test();
        testObserver.awaitTerminalEvent();

        /*
        Resulting logs:

        Creating observable source
        Cold obs created... subscribing
        Emitting 1,2,3
        Creating intermediate observable
        Creating intermediate observable
        Creating intermediate observable
        Emitting complete notification

        IMPORTANT: see that intermediate observables are created AFTER subscribing
         */
    }

    @Test
    public void createsIntermediateObservable_BeforeSubscribing() {
        Observable<String> coldObservable = getCompletableSource()
                .andThen(getIntermediateObservable())
                .subscribeOn(Schedulers.trampoline())
                .observeOn(Schedulers.trampoline());
        System.out.println("Cold obs created... subscribing");
        TestObserver<String> testObserver = coldObservable.test();
        testObserver.awaitTerminalEvent();

        /*
        Resulting logs:

        Creating completable source
        Creating intermediate observable
        Cold obs created... subscribing
        Emitting complete notification

        IMPORTANT: see that intermediate observable is created BEFORE subscribing =(
         */
    }

    private Observable<Integer> getObservableSource() {
        System.out.println("Creating observable source");
        return Observable.create(emitter -> {
            System.out.println("Emitting 1,2,3");
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            System.out.println("Emitting complete notification");
            emitter.onComplete();
        });
    }

    private Observable<String> getIntermediateObservable() {
        System.out.println("Creating intermediate observable");
        return Observable.just("A");
    }

    private Completable getCompletableSource() {
        System.out.println("Creating completable source");
        return Completable.create(emitter -> {
            System.out.println("Emitting complete notification");
            emitter.onComplete();
        });
    }
}

You can see that when we use flatmap, the just is called after subscribing, which makes sense. If the intermediate observable depended on the items emitted to the flatmap then of course the system can't create the intermediate observable before subscription. It would not yet have any values. You can imagine this wouldn't work if flatmap called just before subscribing:

.flatMap(integer -> getIntermediateObservable(integer))

What is weird is that andThen is able to create it's inner observable (i.e. call just) before subscribing. It makes sense that it can do this. The only thing andThen is going to receive is a complete notification, so there is no reason NOT to create the intermediate observable early. The only problem is that it's not the expected behavior.

@Sarath Kn's solution is correct, but for the wrong reason. If we use defer we can see things working as expected:

@Test
public void usingDefer_CreatesIntermediateObservable_AfterSubscribing() {
    Observable<String> coldObservable = getCompletableSource()
            .andThen(Observable.defer(this::getIntermediateObservable))
            .subscribeOn(Schedulers.trampoline())
            .observeOn(Schedulers.trampoline());
    System.out.println("Cold obs created... subscribing");
    TestObserver<String> testObserver = coldObservable.test();
    testObserver.awaitTerminalEvent();

    /*
    Resulting logs:

    Creating completable source
    Cold obs created... subscribing
    Emitting complete notification
    Creating intermediate observable

    IMPORTANT: see that intermediate observable is created AFTER subscribing =) YEAY!!
     */
}
Kolodgie answered 26/2, 2019 at 16:49 Comment(1)
I think this answer is correct but quite long. All we need to know is andThen takes a completable and returns it (or takes a single and returns that). If you do func1(func2()), func2 will not be called in func1's scope, it will be called the callers scope. Sarah is wrong to relate this to .just.Cackle

© 2022 - 2024 — McMap. All rights reserved.