RxJava - How to stop (and resume) a Hot Observable (interval)?
Asked Answered
U

2

11

I have the following Hot Observable:

hotObservable = Observable.interval(0L, 1L, TimeUnit.SECONDS)
                          .map((t) -> getCurrentTimeInMillis()))

However, I can't find a good way to stop it. I was able to partially solve this using takeWhile and a boolean flag (runTimer):

Observable.interval(0L, 1L, TimeUnit.SECONDS)
          .takeWhile((t) -> runTimer)
          .map((t) -> getCurrentTimeInMillis()))

There are 2 things I don't like in this approach though:

  1. I must keep the flag runTimer around, which I don't want.
  2. Once runTimer becomes false, the Observable simply completes, which means if I want to emit again I need to create a new Observable. I don't want that. I just want the Observable stop emitting items until I tell it to start again.

I was hoping for something like this:

hotObservable.stop();
hotObservable.resume();

That way I don't need to keep any flags around and the observable is always alive (it might not be emitting events though).

How can I achieve this?

Undercoating answered 9/12, 2016 at 23:29 Comment(3)
Should the observable stop producing items, or continue operating but stop emitting them?Pierce
@Pierce Stop producing would be ideal, but I'd be happy with stop emitting only as well. Both works in my situation.Undercoating
Kiskae's answer is perfectly valid, but just FYI.. if you would replace takeWhile operator in your previous solution with filter operator, you would get rid of the second disadvantage...Guayule
C
9

One possible approach uses a BehaviorSubject and a switchMap:

BehaviorSubject<Boolean> subject = BehaviorSubject.create(true);
hotObservable = subject.distinctUntilChanged().switchMap((on) -> {
    if (on) {
        return Observable.interval(0L, 1L, TimeUnit.SECONDS);
    } else {
        return Observable.never();
    }
}).map((t) -> getCurrentTimeInMillis());

By sending booleans to the subject the output of the observable can be controlled. subject.onNext(true) will cause any observable created using that subject to begin emitting values. subject.onNext(false) disables that flow.

The switchMap takes care of disposing the underlying observables when it is switched off. It also uses distinctUntilChanged to make sure it does not do unnecessary switching.

Chlamys answered 10/12, 2016 at 2:55 Comment(0)
R
1

Mybe you can use this

Observable.interval(3, TimeUnit.SECONDS)
        .takeUntil(stop)
        .subscribe(new MyObserver());

Thread.sleep(10000);
stop.onNext(-1);
Redcap answered 11/6, 2017 at 5:2 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.