Catch error if retryWhen:s retries runs out
Asked Answered
M

7

16

In the documentation for RetryWhen the example there goes like this:

Observable.create((Subscriber<? super String> s) -> {
  System.out.println("subscribing");
  s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
  return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
      System.out.println("delay retry by " + i + " second(s)");
      return Observable.timer(i, TimeUnit.SECONDS);
  });
}).toBlocking().forEach(System.out::println);

But how do I propagate the Error if the retries runs out?

Adding .doOnError(System.out::println) after the retryWhen clause does not catch the error. Is it even emitted?

Adding a .doOnError(System.out::println) before retryWhen displays always fails for all retries.

Maeda answered 16/6, 2016 at 13:33 Comment(0)
G
14

The doc for retryWhen says that it passes onError notification to its subscribers and terminates. So you can do something like this:

    final int ATTEMPTS = 3;

    Observable.create((Subscriber<? super String> s) -> {
        System.out.println("subscribing");
        s.onError(new RuntimeException("always fails"));
    }).retryWhen(attempts -> attempts
            .zipWith(Observable.range(1, ATTEMPTS), (n, i) ->
                    i < ATTEMPTS ?
                            Observable.timer(i, SECONDS) :
                            Observable.error(n))
            .flatMap(x -> x))
            .toBlocking()
            .forEach(System.out::println);
Garman answered 23/12, 2016 at 13:54 Comment(0)
P
7

The Javadoc for retryWhen states that:

If that Observable calls onComplete or onError then retry will call onCompleted or onError on the child subscription.

Put simply, if you want to propagate the exception, you'll need to rethrow the original exception once you've had enough retrying.

An easy way is to set your Observable.range to be 1 greater than the number of times you want to retry.

Then in your zip function test the current number of retries. If it's equal to NUMBER_OF_RETRIES + 1, return Observable.error(throwable) or re-throw your exception.

EG

Observable.create((Subscriber<? super String> s) -> {
            System.out.println("subscribing");
            s.onError(new RuntimeException("always fails"));
        }).retryWhen(attempts -> {
            return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (throwable, attempt) -> {
                if (attempt == NUMBER_OF_RETRIES + 1) {
                    throw Throwables.propagate(throwable);
                }
                else {
                    return attempt;
                }
            }).flatMap(i -> {
                System.out.println("delaying retry by " + i + " second(s)");
                return Observable.timer(i, TimeUnit.SECONDS);
            });
        }).toBlocking().forEach(System.out::println);

As an aside doOnError does not affect the Observable in any way - it simply provides you with a hook to perform some action if an error occurs. A common example is logging.

Patricapatrice answered 18/6, 2016 at 18:18 Comment(0)
C
1

One option is using Observable.materialize() to convert Observable.range() items into notifications. Then once onCompleted() is issued, one can propagate error downstream (in sample below Pair is used to wrap Observable.range() notifications and exception from Observable)

   @Test
   public void retryWhen() throws Exception {

    Observable.create((Subscriber<? super String> s) -> {
        System.out.println("subscribing");
        s.onError(new RuntimeException("always fails"));
    }).retryWhen(attempts -> {
        return attempts.zipWith(Observable.range(1, 3).materialize(), Pair::new)
           .flatMap(notifAndEx -> {
            System.out.println("delay retry by " + notifAndEx + " second(s)");
            return notifAndEx.getRight().isOnCompleted()
                    ? Observable.<Integer>error(notifAndEx.getLeft())
                    : Observable.timer(notifAndEx.getRight().getValue(), TimeUnit.SECONDS);
        });
    }).toBlocking().forEach(System.out::println);
}

    private static class Pair<L,R> {
        private final L left;
        private final R right;

        public Pair(L left, R right) {
            this.left = left;
            this.right = right;
        }

        public L getLeft() {
            return left;
        }

        public R getRight() {
            return right;
        }
    }
Chrysarobin answered 16/6, 2016 at 14:43 Comment(0)
G
0

You can get the behaviour you want using the RetryWhen builder in rxjava-extras which is on Maven Central. Use the latest version.

Observable.create((Subscriber<? super String> s) -> {
    System.out.println("subscribing");
    s.onError(new RuntimeException("always fails"));
}) 
.retryWhen(RetryWhen
   .delays(Observable.range(1, 3)
               .map(n -> (long) n), 
            TimeUnit.SECONDS).build())
.doOnError(e -> e.printStackTrace()) 
.toBlocking().forEach(System.out::println);
Guizot answered 17/6, 2016 at 1:2 Comment(1)
Um, why the down-vote? This is a unit tested library.Guizot
W
0

This is the simplest way I find to implement it in Kotlin. Retries maxRetries times and if there was no successful attempts, passes the error to the source of onErrorResumeNext.

api.getItems()
        .retryWhen { errors ->
            errors.zipWith(Observable.range(1, maxRetries + 1), { error, i ->
                if (i <= maxRetries) {
                    Observable.timer(i.toLong(), TimeUnit.SECONDS)
                } else {
                    throw error
                }
            })
        }.onErrorResumeNext { error ->
            return@onErrorResumeNext Observable.error(factory.create(error))
        }
Wernher answered 26/7, 2021 at 18:14 Comment(0)
P
-1

You need to use onErrorResumeNext after the retryWhen

In your example

    Observable.create((Subscriber<? super String> s) -> {
        System.out.println("subscribing");
        s.onError(new RuntimeException("always fails"));
    }).retryWhen(attempts -> {
        return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (n, i) -> {
            if (i == NUMBER_OF_RETRIES + 1) {
                throw Throwables.propagate(n);
            }
            else {
                return i;
            }
        }).flatMap(i -> {
            System.out.println("delay retry by " + i + " second(s)");
            return Observable.timer(i, TimeUnit.SECONDS);
        });
    })
    .onErrorResumeNext(t -> {System.out.println("Error after all retries:" + t.getMessage());
                                              return Observable.error(t);
                                          })
    .toBlocking().forEach(System.out::println);

At the bottom of this class you can see a practical example to understand how works. https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/errors/ObservableExceptions.java

Phillipphillipe answered 19/6, 2016 at 19:36 Comment(0)
J
-1

You can either use scan function, which returns a pair with accumulated index and decide whether or not to pass on the error:

.retryWhen(attempts -> 
    return .scan(Pair.create(0, null), (index, value) -> Pair.create(index.first + 1, value))
            .flatMap(pair -> {
                if(pair.first > MAX_RETRY_COUNT) {
                    throw new RuntimeException(pair.second);
                }
                return Observable.timer(pair.first, TimeUnit.SECONDS);
            });

Or you can stick with zipWith operator but increase the number in range Observable and return a pair, instead of the index alone. That way, you won't loose the information about previous throwable.

attempts
    .zipWith(Observable.range(1, MAX_RETRY_COUNT + 1), (throwable, i) -> Pair.create(i, throwable))
    .flatMap(pair -> {
        if(pair.first > MAX_RETRY_COUNT) throw new RuntimeException(pair.second);
        System.out.println("delay retry by " + pair.first + " second(s)");
        return Observable.timer(pair.first, TimeUnit.SECONDS);
    });
Joslyn answered 18/12, 2016 at 7:19 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.