How do I adjust timeout duration on retry using RxJS?
Asked Answered
C

5

11

I'm working with the latest Angular and Typescript and RxJS 5.

Angular has currently made RxJS a necessity. I've used C# primarily for over 10 years and I'm very much used to Linq/Lambdas/fluent syntax which I assume formed the basis of Reactive.

I would like to make an Http get call with an increasing timeout value on retry, but I'm having a problem seeing how to do that and still keeping everything in the pipeline (not using external state).

I get that I can do this, but it will just retry using the same timeout value.

myHttpObservable.timeout(1000).retry(2);

The documentation for RxJS has been poor in many places and asking about it on here only got my question deleted out of existence, which is sad...so I was forced to look through the source.

Is there a way to retry with an increasing timeout duration each time in a way that keeps state in the pipeline? Also, I want an innitial timeout on the first attempt.

I've tried things similar to this at first, but realized the confusing retryWhen operator is not really intended for what I want:

myHttpObservable.timeout(1000).retryWhen((theSubject: Observable<Error>) => {
 return  aNewMyObservableCreatedinHere.timeout(2000);  
});

I know I could accomplish this using external state, but I'm basically looking for an elegant solution that, I think, is what they are kind of driving for with the reactive style of programming.

Clambake answered 26/1, 2017 at 11:42 Comment(3)
Hmm...(and I completely agree about the documentation comments for RxJS, we all have to use it but it's a struggle to do anything beyond the basics) would it be possible to use a seed or accumulator object that you could modify a value on, thereby keeping it in the pipeline/stream, and somehow use that value to modify your timeouts?Subshrub
As for the documentation, learnrxjs.io is a good no-nonsense alternative. Also, RxJS 4 is well-documented, and the differences are known.Outgoing
Do you mean RxJS 5?Subshrub
E
12

One of the biggest issues with RxJs5 at the moment is the documentation. It is really fragmented and not up to par with the previous version yet. By looking at the documentation of RxJs4 you can see that .retryWhen() already has an example for building an exponential backoff available which can be easily migrated towards RxJs5:

Rx.Observable.throw(new Error('splut'))
  .retryWhen(attempts => Rx.Observable.range(1, 3)
    .zip(attempts, i => i)
    .mergeMap(i => {
      console.log("delay retry by " + i + " second(s)");
      return Rx.Observable.timer(i * 1000);
    })
  ).subscribe();
Elevated answered 26/1, 2017 at 12:2 Comment(4)
Yeah we need a really complete book. Not just a recipes/field guide thing, but a "thinking in streams" sort of book. Most JS developers I've seen try to work with RxJS frequently default to some external state when running into anything more than a basic mapping or some such, because the whole "concept" is semi foreign at this point (though this will change dramatically in the next couple of years I think).Subshrub
Mark I used your example in a plunker and it worked pretty well! One problem I have though, is how to get an initial timeout before the retryWhen (the initial timeout just gets resubscribed if I use one). I want to make a get with timeout duration = X and if it times out, then retry with timeout = X+1.Clambake
Hi Brandon, I have been thinking about how to solve your problem and found a solution. Because it is a different approach then this answer i have provided it to your question as a new answer. Hope it helps!Elevated
Sorry, I meant to comment on and mark this one as the final correct version.Clambake
M
7

There is an operator in backoff-rxjs npm package to deal with this case called retryBackoff. I described it in the article at blog.angularindepth.com, but in a nutshell it's this:

source.pipe(
  retryWhen(errors => errors.pipe(
      concatMap((error, iteration) => 
          timer(Math.pow(2, iteration) * initialInterval, maxInterval))));

Here are the sources for the more customizable version.

Meredith answered 3/11, 2018 at 3:22 Comment(0)
E
6

Based on additional input in the comments of my previous answer I have been experimenting with the .expand() operator to solve your requirement:

I want to make a get with timeout duration = X and if it times out, then retry with timeout = X+1

The following snippet starts with timeout = 500 and for each attempt the timeout is increased with attempt * 500 until maxAttempts has been reached or the result has been successfully retrieved:

getWithExpandingTimeout('http://reaches-max-attempts', 3)
  .subscribe(
    res => console.log(res),
    err => console.log('ERROR: ' + err.message)
  );

getWithExpandingTimeout('http://eventually-finishes-within-timeout', 10)
  .subscribe(
    res => console.log(res),
    err => console.log('ERROR: ' + err.message)
  );



/*
retrieve the given url and keep trying with an expanding 
timeout until it succeeds or maxAttempts has been reached
*/
function getWithExpandingTimeout(url, maxAttempts) {
  return get(url, 1)
    .expand(res => {
      if(res.error) {
        const nextAttempt = 1 + res.attempt;
        if(nextAttempt > maxAttempts) {
          // too many retry attempts done
          return Rx.Observable.throw(new Error(`Max attempts reached to retrieve url ${url}: ${res.error.message}`));
        }
        
        // retry next attempt
        return get(url, nextAttempt);
      }
      return Rx.Observable.empty(); // done with retrying, result has been emitted
    })
    .filter(res => !res.error);
}

/*
 retrieve info from server with timeout based on attempt
 NOTE: does not errors the stream so expand() keeps working
*/
function get(url, attempt) {
  return Rx.Observable.of(`result for ${url} returned after #${attempt} attempts`)
    .do(() => console.log(`starting attempt #${attempt} to retrieve ${url}`))
    .delay(5 * 500)
    .timeout(attempt * 500)
    .catch(error => Rx.Observable.of({ attempt: attempt, error }));
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

How does this work

Every value that is produced upstream or by the .expand() operator is emitted downstream AND used as input for the .expand() operator. This continues until no further values are emitted. By leveraging this behaviour the .get() function gets re-attempted with an increasing attempt when the emission contains an .error value inside.

The .get() function does not throw an Error because otherwise we need to catch it in the .expand() or the recursion will break unexpected.

When the maxAttempts is exceeded the .expand() operator throws an Error stopping the recursion. When no .error property is present in the emission then we expect this to be a succes result and emit an empty Observable, stopping the recursion.

NOTE: It uses .filter() to remove all emissions based on the .error property because all values produced by the .expand() are also emitted downstream but these .error values are internal state.

Elevated answered 4/2, 2017 at 20:54 Comment(1)
We did something similar basically and it seems to do fine. Although we didn't use the expand operator I like the way you employed it. I might refactor our stuff along those lines later, thanks.Clambake
M
4

I use delayWhen to create a notifier function that makes retryWhen emit on increasing delays after each error occurrence. You can choose different time series by changing the formula used to calculate the delay between retries. See the two example formulas below for the geometric and exponential backoff retry strategies. Note also how I limit the number of retries and throw an error if I reach that limit.

const initialDelay = 1000;
const maxRetries = 4;
throwError('oops')
  .pipe(
    retryWhen(errors =>
      errors.pipe(
        delayWhen((_,i) => {
          // const delay = (i+1)*initialDelay;            // geometric
          const delay = Math.pow(2,i)*initialDelay;       // exponential
          console.log(`retrying after ${delay} msec...`);
          return timer(delay);
        }),
        take(maxRetries),
        concat(throwError('number of retries exceeded')))))
  .subscribe(
    x => console.log('value:', x),
    e => console.error('error:', e)
  );
Metempsychosis answered 18/12, 2018 at 12:9 Comment(0)
P
0

I think the backoff isn't correct. This delay is the error to the next request, not the request to the next request, the timeout in the rxjs say that the source spends time is larger than the timeout, I want to change the timeout of the request, not add the delay between the error and next request, who know how to do it?

Like that:
Request: 8s, timeout: 3s, timeout change: 3 * 2 * retryIndex
expect: This request will retry twice and spend 8s finally.
Backoff: don't use the timeout, this request will succeed at the first time. If the request has another error, the next request will send after 3 * 2 * retryIndex.

Potentilla answered 19/7, 2021 at 2:30 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.