RxJava only check the first response item with timeout
Asked Answered
K

5

15

I see that ReactiveX (RxJava) has an operator timeout, which will apply to every item in a subscription stream. But I only want to check the very first response with a timeout and do not care about timeouts for the following responses. How can I implement this requirement elegantly with RxJava's operators?

Keynes answered 3/9, 2016 at 7:19 Comment(0)
W
5

Best option is to use a timeout overload which returns a timeout observable for every item, and has one for the subscription as well (which is the one you are interested in).

observable.timeout(
  () -> Observable.empty().delay(10, TimeUnit.SECONDS),
  o -> Observable.never()
)

I'll explain, the first func0 will run on subscribe, and will emit an empty observable (which emits complete) delayed by the time you want. if the time passes before any item arrived there will be a timeout like you wanted. the second parameter func1 will decide timeouts between items, which you have no use for so we just passes never (which does not complete or do anything)

Another option is following Luciano suggestion, you can do it like this:

    public static class TimeoutFirst<T> implements Transformer<T,T> {

    private final long timeout;
    private final TimeUnit unit;

    private TimeoutFirst(long timeout, TimeUnit unit) {
        this.timeout = timeout;
        this.unit = unit;
    }

    @Override
    public Observable<T> call(Observable<T> observable) {
        return Observable.amb(observable,
                Observable.timer(timeout, unit).flatMap(aLong -> Observable.error(new TimeoutException("Timeout after " + timeout + " "  + unit.name()))));
    }
}

public static <T> Transformer<T, T> timeoutFirst(long timeout, TimeUnit seconds) {
    return new TimeoutFirst<>(timeout, seconds);
}

which is a pretty neat solution using amb.

Winged answered 18/1, 2018 at 16:15 Comment(0)
S
3

Here is a more functional way of doing it. It's in Scala but should be transcribed to Java:

val myTimeout : Observable[Nothing] = Observable timer (10 seconds) flatMap (_ => Observable error new TimeoutException("I timed out!"))

myStream amb myTimeout

The amb operator returns the value of the observable that emits first.

Separate answered 13/12, 2016 at 9:22 Comment(0)
W
2

One way to do it is as follows:

Observable<Response> respStream = respStream();
ConnectableObservable<Response> sharedRespStream = respStream.publish();

Observable<String> first = sharedRespStream.first().timeout(2, TimeUnit.SECONDS);
Observable<String> rest = sharedRespStream.skip(1);
Observable<String> result = first.mergeWith(rest);

sharedRespStream.connect();

result.subscribe(response -> handleResponse(response), error -> handleError(error));

The code is self explaining: share response to avoid duplicate requests, apply timeout to first item emitted, and merge it with items following first one.

Wilmawilmar answered 4/9, 2016 at 6:22 Comment(1)
tried Your solution, but threre is TimeoutException after timeout that was set on first observable.Isomagnetic
B
2

Kotlin extension for @ndori's answer

fun <T> Observable<T>.timeoutFirstMessage(timeout: Long, unit: TimeUnit): Observable<T> {
    return this.timeout<Long, Long>(
        Observable.timer(timeout, unit),
        Function { Observable.never<Long>() }
    )
}
Bulger answered 28/1, 2020 at 21:58 Comment(0)
B
1

RxJava2 version for @ndori 's answer

Observable.timeout(
  Observable.empty().delay(10, TimeUnit.SECONDS),
  o -> Observable.never()
)

Source docs

Boredom answered 14/12, 2018 at 10:10 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.