I see that ReactiveX (RxJava) has an operator timeout
, which will apply to every item in a subscription stream. But I only want to check the very first response with a timeout and do not care about timeouts for the following responses. How can I implement this requirement elegantly with RxJava's operators?
Best option is to use a timeout overload which returns a timeout observable for every item, and has one for the subscription as well (which is the one you are interested in).
observable.timeout(
() -> Observable.empty().delay(10, TimeUnit.SECONDS),
o -> Observable.never()
)
I'll explain, the first func0 will run on subscribe, and will emit an empty observable (which emits complete) delayed by the time you want. if the time passes before any item arrived there will be a timeout like you wanted. the second parameter func1 will decide timeouts between items, which you have no use for so we just passes never (which does not complete or do anything)
Another option is following Luciano suggestion, you can do it like this:
public static class TimeoutFirst<T> implements Transformer<T,T> {
private final long timeout;
private final TimeUnit unit;
private TimeoutFirst(long timeout, TimeUnit unit) {
this.timeout = timeout;
this.unit = unit;
}
@Override
public Observable<T> call(Observable<T> observable) {
return Observable.amb(observable,
Observable.timer(timeout, unit).flatMap(aLong -> Observable.error(new TimeoutException("Timeout after " + timeout + " " + unit.name()))));
}
}
public static <T> Transformer<T, T> timeoutFirst(long timeout, TimeUnit seconds) {
return new TimeoutFirst<>(timeout, seconds);
}
which is a pretty neat solution using amb.
Here is a more functional way of doing it. It's in Scala but should be transcribed to Java:
val myTimeout : Observable[Nothing] = Observable timer (10 seconds) flatMap (_ => Observable error new TimeoutException("I timed out!"))
myStream amb myTimeout
The amb
operator returns the value of the observable that emits first.
One way to do it is as follows:
Observable<Response> respStream = respStream();
ConnectableObservable<Response> sharedRespStream = respStream.publish();
Observable<String> first = sharedRespStream.first().timeout(2, TimeUnit.SECONDS);
Observable<String> rest = sharedRespStream.skip(1);
Observable<String> result = first.mergeWith(rest);
sharedRespStream.connect();
result.subscribe(response -> handleResponse(response), error -> handleError(error));
The code is self explaining: share response to avoid duplicate requests, apply timeout to first item emitted, and merge it with items following first one.
Kotlin extension for @ndori's answer
fun <T> Observable<T>.timeoutFirstMessage(timeout: Long, unit: TimeUnit): Observable<T> {
return this.timeout<Long, Long>(
Observable.timer(timeout, unit),
Function { Observable.never<Long>() }
)
}
RxJava2 version for @ndori 's answer
Observable.timeout(
Observable.empty().delay(10, TimeUnit.SECONDS),
o -> Observable.never()
)
© 2022 - 2024 — McMap. All rights reserved.
TimeoutException
after timeout that was set onfirst
observable. – Isomagnetic