RxJava Observable get notified on first emission
Asked Answered
S

4

13

I have three Observables which I combine with combineLastest:

    Observable<String> o1 = Observable.just("1");
    Observable<String> o2 = Observable.just("2");
    Observable<String> o3 = Observable.just("3");

    Observable.combineLatest(o1, o2, o3, new Func3<String, String, String, Object>() {
        @Override
        public Object call(String s, String s2, String s3) {
            return null;
        }
    });

I want to be notified about the first emission of one of the Observables without ignoring the later emissions, which I guess first operator would do. Is there a convenient operator for that like (example):

    o1.doOnFirst(new Func1<String, Void>() {
        @Override
        public Void call(String s) {
            return null;
        }
    })
Styliform answered 22/11, 2015 at 20:54 Comment(0)
D
14

I think you can have a practical doOnFirst with a simple take if you're handling a stream:

public static <T> Observable<T> withDoOnFirst(Observable<T> source, Action1<T> action) {
    return source.take(1).doOnNext(action).concatWith(source);
}

This way the action is only bound to the first item.

This could be changed to handle observables which are not backed by streams adding skip to skip the already taken items:

public static <T> Observable<T> withDoOnFirstNonStream(Observable<T> source, Action1<T> action) {
    return source.take(1).doOnNext(action).concatWith(source.skip(1));
}
Disestablish answered 22/11, 2015 at 22:48 Comment(2)
Requiring two subscriptions to source is potentially inefficient (we don't know what overhead that could involve for any particular observable). I'd suggest going with an answer that used one subscription for general use. For a hot source you could miss emissions too.Retread
I applied this to the logic of calling pull-to-refresh api, and there is a symptom that the api is called twice The use of the take operator appears to be the causePasserine
E
7

For convenience, I created these extension functions for Flowable and Observable.
Note, that with doOnFirst() the action will be called before the first element emission, whilst doAfterFirst() will firstly emit the first item and then perform the action.

fun <T> Observable<T>.doOnFirst(onFirstAction: (T) -> Unit): Observable<T> =
    take(1)
        .doOnNext { onFirstAction.invoke(it) }
        .concatWith(skip(1))

fun <T> Flowable<T>.doOnFirst(onFirstAction: (T) -> Unit): Flowable<T> =
    take(1)
        .doOnNext { onFirstAction.invoke(it) }
        .concatWith(skip(1))

fun <T> Observable<T>.doAfterFirst(afterFirstAction: (T) -> Unit): Observable<T> =
    take(1)
        .doAfterNext { afterFirstAction.invoke(it) }
        .concatWith(skip(1))

fun <T> Flowable<T>.doAfterFirst(afterFirstAction: (T) -> Unit): Flowable<T> =
    take(1)
        .doAfterNext { afterFirstAction.invoke(it) }
        .concatWith(skip(1))

Usage is as simple as this:

Flowable.fromArray(1, 2, 3)
            .doOnFirst { System.err.println("First $it") }
            .subscribe { println(it) }

Output:

// First 1  
// 1  
// 2  
// 3

And:

Flowable.fromArray(1, 2, 3)
            .doAfterFirst { System.err.println("First $it") }
            .subscribe { println(it) }

Output:

// 1  
// First 1
// 2  
// 3
Efren answered 18/10, 2019 at 12:7 Comment(1)
Be careful with "skip(1)" with Subjects. Then after first emited value stream is terminated. You can use "this"Highspeed
S
6

There are a couple of solutions I can think of. The first one is an ugly but simple hack of doOnNext. Just add a boolean field to the Action1 indicating whether the first item has been received. Once received, do whatever it is you want to do, and flip the boolean. For example:

Observable.just("1").doOnNext(new Action1<String>() {

        boolean first = true;

        @Override
        public void call(String t) {
            if (first) {
                // Do soemthing
                first = false;
            }
        }
    });

The second one is to subscribe twice on the observable you want to monitor using publish or share(), with one of those publications going through first (depending on whether you want to manually connect to the published observable). You'll end up with two separate observables that emit the same items, only the first one will stop after the first emission:

ConnectableObservable<String> o1 = Observable.just("1").publish();

o1.first().subscribe(System.out::println); //Subscirbed only to the first item
o1.subscribe(System.out::println); //Subscirbed to all items

o1.connect(); //Connect both subscribers
Surprint answered 22/11, 2015 at 22:5 Comment(1)
Your first suggestion will only work once. Better to wrap with defer to hold the boolean state so you get the behaviour on each subscription.Retread
R
4

Using rxjava-extras:

observable
  .compose(Transformers.doOnFirst(System.out::println))

It's unit tested and under the covers just uses a per-subscription counter in an operator. Note that per-subscription is important as there are plenty of uses cases where an observable instance gets used more than once and we want the doOnFirst operator to apply each time.

Source code is here.

Retread answered 23/11, 2015 at 19:50 Comment(2)
Oh snap, even more RxJava! ;) Thanks.Styliform
More RxGoodies! HoorayFrederico

© 2022 - 2024 — McMap. All rights reserved.