RxJs: poll until interval done or correct data received
Asked Answered
C

5

27

How do i execute the following scenario in the browser with RxJs:

  • submit data to queue for processing
  • get back the job id
  • poll another endpoint every 1s until result is available or 60seconds have passed(then fail)

Intermediate solution that i've come up with:

 Rx.Observable
    .fromPromise(submitJobToQueue(jobData))
    .flatMap(jobQueueData => 
      Rx.Observable
            .interval(1000)
            .delay(5000)
            .map(_ => jobQueueData.jobId)
            .take(55)
    )
    .flatMap(jobId => Rx.Observable.fromPromise(pollQueueForResult(jobId)))
    .filter(result => result.completed)
    .subscribe(
      result => console.log('Result', result),
      error =>  console.log('Error', error)
    );
  1. Is there a way without intermediate variables to stop the timer once the data arrives or error occurs? I now i could introduce new observable and then use takeUntil
  2. Is flatMap usage here semantically correct? Maybe this whole thing should be rewritten and not chained with flatMap ?
Cannon answered 15/3, 2016 at 9:56 Comment(0)
E
45

Starting from the top, you've got a promise that you turn into an observable. Once this yields a value, you want make a call once per second until you receive a certain response (success) or until a certain amount of time has passed. We can map each part of this explanation to an Rx method:

"Once this yields a value" = map/flatMap (flatMap in this case because what comes next will also be observables, and we need to flatten them out)

"once per second" = interval

"receive a certain response" = filter

"or" = amb

"certain amount of time has passed" = timer

From there, we can piece it together like so:

Rx.Observable
  .fromPromise(submitJobToQueue(jobData))
  .flatMap(jobQueueData =>
    Rx.Observable.interval(1000)
      .flatMap(() => pollQueueForResult(jobQueueData.jobId))
      .filter(x => x.completed)
      .take(1)
      .map(() => 'Completed')
      .amb(
        Rx.Observable.timer(60000)
          .flatMap(() => Rx.Observable.throw(new Error('Timeout')))
      )
  )
  .subscribe(
    x => console.log('Result', x),
    x => console.log('Error', x)
  )
;

Once we've got our initial result, we project that into a race between two observables, one that will yield a value when it receives a successful response, and one that will yield a value when a certain amount of time has passed. The second flatMap there is because .throw isn't present on observable instances, and the method on Rx.Observable returns an observable which also needs to be flattened out.

It turns out that the amb / timer combo can actually be replaced by timeout, like so:

Rx.Observable
  .fromPromise(submitJobToQueue(jobData))
  .flatMap(jobQueueData =>
    Rx.Observable.interval(1000)
      .flatMap(() => pollQueueForResult(jobQueueData.jobId))
      .filter(x => x.completed)
      .take(1)
      .map(() => 'Completed')
      .timeout(60000, Rx.Observable.throw(new Error('Timeout')))
  )
  .subscribe(
    x => console.log('Result', x),
    x => console.log('Error', x)
  )
;

I omitted the .delay you had in your sample as it wasn't described in your desired logic, but it could be fitted trivially to this solution.

So, to directly answer your questions:

  1. In the code above there is no need to manually stop anything, as the interval will be disposed of the moment the subscriber count drops to zero, which will occur either when the take(1) or amb / timeout completes.
  2. Yes, both usages in your original were valid, as in both cases you were projecting each element of an observable into a new observable, and wanting to flatten the resultant observable of observables out into a regular observable.

Here's the jsbin I threw together to test the solution (you can tweak the value returned in pollQueueForResult to obtain the desired success/timeout; times have been divided by 10 for the sake of quick testing).

Existence answered 15/3, 2016 at 13:32 Comment(4)
@matt-burnell Extremely good answer, which helped me to no end! Do you have any tips with regards to doing exponential back-off?Burglarize
@VegardLarsen Well, in this case, if you wanted to back off the poll interval, all you'd need to do is replace the interval(1000) stream with something with the values you wanted. For example, you could use a combination of Observable.just, delay, and merge to create a stream that would yield values at the 1, 2, 4, 8, 16, and 32 second marks (extremely low-tech; one could of course write a function to express this more elegantly).Existence
@matt-burnell Of course. Reactive can be a bit complicated the first time you look at it, thank you. :)Burglarize
Anybody care to try this in the new version of rxjs? (pipes, etc)Provost
K
16

A small optimization to the excellent answer from @matt-burnell. You can replace the filter and take operators with the first operator as follows

Rx.Observable
  .fromPromise(submitJobToQueue(jobData))
  .flatMap(jobQueueData =>
    Rx.Observable.interval(1000)
      .flatMap(() => pollQueueForResult(jobQueueData.jobId))
      .first(x => x.completed)
      .map(() => 'Completed')
      .timeout(60000, Rx.Observable.throw(new Error('Timeout')))

  )
  .subscribe(
    x => console.log('Result', x),
    x => console.log('Error', x)
  );

Also, for people that may not know, the flatMap operator is an alias for mergeMap in RxJS 5.0.

Kirin answered 30/11, 2016 at 9:24 Comment(1)
According to github.com/ReactiveX/rxjs/blob/master/MIGRATION.md flatMap is still valid in RxJS 5Cannon
N
2

Not your question, but I needed the same functionality

import { takeWhileInclusive } from 'rxjs-take-while-inclusive'
import { of, interval, race, throwError } from 'rxjs'
import { catchError, timeout, mergeMap, delay, switchMapTo } from 'rxjs/operators'

const defaultMaxWaitTimeMilliseconds = 5 * 1000

function isAsyncThingSatisfied(result) {
  return true
}

export function doAsyncThingSeveralTimesWithTimeout(
  doAsyncThingReturnsPromise,
  maxWaitTimeMilliseconds = defaultMaxWaitTimeMilliseconds,
  checkEveryMilliseconds = 500,
) {
  const subject$ = race(
    interval(checkEveryMilliseconds).pipe(
      mergeMap(() => doAsyncThingReturnsPromise()),
      takeWhileInclusive(result => isAsyncThingSatisfied(result)),
    ),
    of(null).pipe(
      delay(maxWaitTimeMilliseconds),
      switchMapTo(throwError('doAsyncThingSeveralTimesWithTimeout timeout'))
    )
  )

  return subject$.toPromise(Promise) // will return first result satistieble result of doAsyncThingReturnsPromise or throw error on timeout
}

Example

// mailhogWaitForNEmails
import { takeWhileInclusive } from 'rxjs-take-while-inclusive'
import { of, interval, race, throwError } from 'rxjs'
import { catchError, timeout, mergeMap, delay, switchMap } from 'rxjs/operators'

const defaultMaxWaitTimeMilliseconds = 5 * 1000

export function mailhogWaitForNEmails(
  mailhogClient,
  numberOfExpectedEmails,
  maxWaitTimeMilliseconds = defaultMaxWaitTimeMilliseconds,
  checkEveryMilliseconds = 500,
) {
  let tries = 0

  const mails$ = race(
    interval(checkEveryMilliseconds).pipe(
      mergeMap(() => mailhogClient.getAll()),
      takeWhileInclusive(mails => {
        tries += 1
        return mails.total < numberOfExpectedEmails
      }),
    ),
    of(null).pipe(
      delay(maxWaitTimeMilliseconds),
      switchMap(() => throwError(`mailhogWaitForNEmails timeout after ${tries} tries`))
    )
  )

  // toPromise returns promise which contains the last value from the Observable sequence.
  // If the Observable sequence is in error, then the Promise will be in the rejected stage.
  // If the sequence is empty, the Promise will not resolve.
  return mails$.toPromise(Promise)
}

// mailhogWaitForEmailAndClean
import { mailhogWaitForNEmails } from './mailhogWaitForNEmails'

export async function mailhogWaitForEmailAndClean(mailhogClient) {
  const mails = await mailhogWaitForNEmails(mailhogClient, 1)

  if (mails.count !== 1) {
    throw new Error(
      `Expected to receive 1 email, but received ${mails.count} emails`,
    )
  }

  await mailhogClient.deleteAll()

  return mails.items[0]
}
Namnama answered 22/2, 2019 at 19:17 Comment(0)
S
2

We also have the same use case and the below code works pretty good.

import { timer, Observable } from "rxjs";
import { scan, tap, switchMapTo, first } from "rxjs/operators";

function checkAttempts(maxAttempts: number) {
  return (attempts: number) => {
    if (attempts > maxAttempts) {
      throw new Error("Error: max attempts");
    }
  };
}

export function pollUntil<T>(
  pollInterval: number,
  maxAttempts: number,
  responsePredicate: (res: any) => boolean
) {
  return (source$: Observable<T>) =>
    timer(0, pollInterval).pipe(
      scan(attempts => ++attempts, 0),
      tap(checkAttempts(maxAttempts)),
      switchMapTo(source$),
      first(responsePredicate)
    );
}

if the number of attempts has reached the limit, an error is thrown which results in the output stream being unsubscribed. Moreover, you only make http requests until the given condition defined as responsePredicate is not met.

Exemplary usage:

import { of } from "rxjs";

import { pollUntil } from "./poll-until-rxjs";

const responseObj = { body: { inProgress: true } };
const response$ = of(responseObj);
// this is to simulate a http call
response$
  .pipe(pollUntil(1000, 3, ({ body }) => !body.inProgress))
  .subscribe(({ body }) => console.log("Response body: ", body));

setTimeout(() => (responseObj.body.inProgress = false), 1500);
Severity answered 5/5, 2020 at 21:54 Comment(1)
What happens in case the service returns an error? (status code != 200)Manley
R
1

Angular / typescript rewritten solution from above:

export interface PollOptions {
  interval: number;
  timeout: number;
}

const OPTIONS_DEFAULT: PollOptions = {
  interval: 5000,
  timeout: 60000
};
@Injectable()
class PollHelper {
  startPoll<T>(
    pollFn: () => Observable<T>, // intermediate polled responses
    stopPollPredicate: (value: T) => boolean, // condition to stop polling
    options: PollOptions = OPTIONS_DEFAULT): Observable<T> {
    return interval(options.interval)
      .pipe(
        exhaustMap(() => pollFn()),
        first(value => stopPollPredicate(value)),
        timeout(options.timeout)
      );
  }
}

Example:

pollHelper.startPoll<Response>(
  // function that provides the polling observable
  () => httpClient.get<Response>(...),
  // stop polling predicate
  response => response.isDone()
).subscribe(result => {
  console.log(result);
});
Rex answered 22/3, 2020 at 14:13 Comment(1)
Only answer that is up to date, and wait for the previous request is done. Thanks a lot!Cumulous

© 2022 - 2024 — McMap. All rights reserved.