RxJava - Opposite of switchMap() Operator?
Asked Answered
C

3

11

I am wondering if there is a way to compose existing operators to perform the opposite of a switchMap().

The switchMap() will chase after the latest emission it receives and cancel any Observable it previously was executing. Let's say I flipped it, and I want to ignore all emissions coming to a xxxMap() operator while it is busy with the first emission it received. It will keep ignoring emissions until it finishes emitting the current Observable inside of it. Then it will process the next emission it receives.

Observable.interval(1, TimeUnit.SECONDS)
        .doOnNext(i -> System.out.println("Source Emitted Value: " + i))
        .ignoreWhileBusyMap(i -> doIntensiveProcess(i).subcribeOn(Schedulers.computation())) 
        .subscribe(i -> System.out.println("Subscriber received Value: " + i));

Is there a way to accomplish this? In the above example, if intensiveProcess() were to last three seconds, the ignoreWhileBusyMap() would process 0 but likely ignore emissions 1 and 2 coming from interval().It would then process 3 but likely ignore 4 and 5, and so on...

Christinchristina answered 11/5, 2016 at 19:18 Comment(0)
L
6

Sure, gate the processing of a value by a boolean that is set after the processing finished:

AtomicBoolean gate = new AtomicBoolean(true);

Observable.interval(200, TimeUnit.MILLISECONDS)
.flatMap(v -> {
    if (gate.get()) {
        gate.set(false);

        return Observable.just(v).delay(500, TimeUnit.MILLISECONDS)
                .doAfterTerminate(() -> gate.set(true));
    } else {
        return Observable.empty();
    }
})
.take(10)
.toBlocking()
.subscribe(System.out::println, Throwable::printStackTrace);

Edit

Alternative:

Observable.interval(200, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.flatMap(v -> {
    return Observable.just(v).delay(500, TimeUnit.MILLISECONDS);
}, 1)
.take(10)
.toBlocking()
.subscribe(System.out::println, Throwable::printStackTrace);

You can change onBackpressureDrop to onBackpressureLatest to continue immediately with the latest value.

Lustick answered 11/5, 2016 at 19:30 Comment(5)
Awesome, I did something similar with a Semaphore but was hoping to use a purely reactive composition with existing operators. I suppose I could wrap all this up in a Transformer though.Christinchristina
Use a deferred transformer to avoid sharing the gate across multiple end-subscribers.Lustick
Just realized your solution is not block-y like mine though, so I'll switch to this. Thanks!Christinchristina
Haha yes, I'll keep that in mind this time : ) Won't make that mistake againChristinchristina
thanks for your alternative. I was able to compose it into an extension function for Kotlin: inline fun <T,R> Observable<T>.ignoreWhileBusyMap(crossinline mapper: (T) -> Observable<R>) = onBackpressureDrop() .flatMap({ mapper.invoke(it)} ,1)Christinchristina
P
6

I know that this an old thread but currently there is a RxJs operator that does just that.

The operator is exhaustMap.

According to the docs:

ExhaustMap projects each source value to an Observable which is merged in the output Observable only if the previous projected Observable has completed.

Docs example:

import { fromEvent, interval } from 'rxjs';
import { exhaustMap, take } from 'rxjs/operators';

const clicks = fromEvent(document, 'click');
const result = clicks.pipe(
  exhaustMap(ev => interval(1000).pipe(take(5)))
);
result.subscribe(x => console.log(x));
Petticoat answered 28/11, 2019 at 16:0 Comment(2)
This is currently the best answer as ExhaustMap did not exist when the original answer was accepted.Dochandorrach
ExhaustMap still doesn't exist in RxJava, which is what was originally asked. I'm so disappointed RxJava doesn't have it though, coming from RxJs.Nopar
I
0

To answer Jeopardy style: what is concatMap?

concatMap will subscribe to the first Observable and will not subscribe to subsequent Observables until the previous Observable calls onComplete().

In this respect it is the "opposite" of switchMap which eagerly unsubscribes from previous Observables when a new one comes along.

concatMap wants to hear everything that each Observable has to say, whereas switchMap is a social butterfly, and moves on as soon as another Observable is available.

Insphere answered 15/7, 2016 at 7:9 Comment(4)
Not quite, if you read the question closely you'll see I'm pursuing a behavior that has little to do with concatMap or its perceived opposite. To follow your analogy, I was looking for a xxxMap operator that would focus on talking to the first Observable it encounters and tell any subsequent Observables "not now, I'm busy talking to this guy". Only when his conversation is done he will allow another Observable to engage with him.Christinchristina
That's what concatMap does.Insphere
I believe concatMap() is the same as flatMap() but it doesn't interleave. It guarantees all emissions will ultimately be emitted even if it queues them. But the operator I was asking for simply ignores subsequent emissions while it is busy.Christinchristina
Ah, ok, by "ignore emissions" I thought you meant "ignore emissions for now", but as is spelled out in the last paragraph of your question: these should emissions be dropped. In that case, I like the solution akarnokd posed.Insphere

© 2022 - 2024 — McMap. All rights reserved.