How do I rate limit requests losslessly using RxJS 5
Asked Answered
E

6

10

I would like to use make a series of requests to a server, but the server has a hard rate limit of 10 request per second. If I try to make the requests in a loop, it will hit the rate limit since all the requests will happen at the same time.

for(let i = 0; i < 20; i++) {
  sendRequest();
}

ReactiveX has lots of tools for modifying observable streams, but I can't seem to find the tools to implement rate limiting. I tried adding a standard delay, but the requests still fire at the same time, just 100ms later than they did previously.

const queueRequest$ = new Rx.Subject<number>();

queueRequest$
  .delay(100)
  .subscribe(queueData => {
    console.log(queueData);
  });

const queueRequest = (id) => queueRequest$.next(id);

function fire20Requests() {
  for (let i=0; i<20; i++) {
    queueRequest(i);
  }
}

fire20Requests();
setTimeout(fire20Requests, 1000);
setTimeout(fire20Requests, 5000);

The debounceTime and throttleTime operators are similar to what I'm looking for as well, but that is lossy instead of lossless. I want to preserve every request that I make, instead of discarding the earlier ones.

...
queueRequest$
  .debounceTime(100)
  .subscribe(queueData => {
    sendRequest();
  });
...

How do I make these requests to the server without exceeding the rate limit using ReactiveX and Observables?

Euler answered 16/2, 2017 at 20:31 Comment(0)
D
8

The implementation in the OP's self answer (and in the linked blog) always imposes a delay which is less than ideal.

If the rate-limited service allows for 10 requests per second, it should be possible to make 10 requests in, say, 10 milliseconds, as long as the next request is not made for another 990 milliseconds.

The implementation below applies a variable delay to ensure the limit is enforced and the delay is only applied to requests that would see the limit exceeded.

function rateLimit(source, count, period) {

  return source
    .scan((records, value) => {

      const now = Date.now();
      const since = now - period;

      // Keep a record of all values received within the last period.

      records = records.filter((record) => record.until > since);
      if (records.length >= count) {

        // until is the time until which the value should be delayed.

        const firstRecord = records[0];
        const lastRecord = records[records.length - 1];
        const until = firstRecord.until + (period * Math.floor(records.length / count));

        // concatMap is used below to guarantee the values are emitted
        // in the same order in which they are received, so the delays
        // are cumulative. That means the actual delay is the difference
        // between the until times.

        records.push({
          delay: (lastRecord.until < now) ?
            (until - now) :
            (until - lastRecord.until),
          until,
          value
        });
      } else {
        records.push({
          delay: 0,
          until: now,
          value
        });
      }
      return records;

    }, [])
    .concatMap((records) => {

      const lastRecord = records[records.length - 1];
      const observable = Rx.Observable.of(lastRecord.value);
      return lastRecord.delay ? observable.delay(lastRecord.delay) : observable;
    });
}

const start = Date.now();
rateLimit(
  Rx.Observable.range(1, 30),
  10,
  1000
).subscribe((value) => console.log(`${value} at T+${Date.now() - start}`));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Dottiedottle answered 16/2, 2017 at 23:4 Comment(1)
FYI, I've updated this answer. The until calculation was not optimal; it should be based on the first record - not on now.Dottiedottle
E
3

This blog post does a great job of explaining that RxJS is great at discarding events, and how they came to the answer, but ultimately, the code you're looking for is:

queueRequest$
  .concatMap(queueData => Rx.Observable.of(queueData).delay(100))
  .subscribe(() => {
    sendRequest();
  });

concatMap adds concatenates the newly created observable to the back of the observable stream. Additionally, using delay pushes back the event by 100ms, allowing 10 request to happen per second. You can view the full JSBin here, which logs to the console instead of firing requests.

Euler answered 16/2, 2017 at 20:31 Comment(0)
P
3

Actually, there's an easier way to do this with the bufferTime() operator and its three arguments:

bufferTime(bufferTimeSpan, bufferCreationInterval, maxBufferSize)

This means we can use bufferTime(1000, null, 10) which means we'll emit a buffer of max 10 items or after max 1s. The null means we want to open a new buffer immediately after the current buffer is emitted.

function mockRequest(val) {
  return Observable
    .of(val)
    .delay(100)
    .map(val => 'R' + val);
}

Observable
  .range(0, 55)
  .concatMap(val => Observable.of(val)
    .delay(25) // async source of values
    // .delay(175)
  )

  .bufferTime(1000, null, 10) // collect all items for 1s

  .concatMap(buffer => Observable
    .from(buffer) // make requests
    .delay(1000)  // delay this batch by 1s (rate-limit)
    .mergeMap(value => mockRequest(value)) // collect results regardless their initial order
    .toArray()
  )
  // .timestamp()
  .subscribe(val => console.log(val));

See live demo: https://jsbin.com/mijepam/19/edit?js,console

You can experiment with different initial delay. With only 25ms the request will be sent in batches by 10:

[ 'R0', 'R1', 'R2', 'R3', 'R4', 'R5', 'R6', 'R7', 'R8', 'R9' ]
[ 'R10', 'R11', 'R12', 'R13', 'R14', 'R15', 'R16', 'R17', 'R18', 'R19' ]
[ 'R20', 'R21', 'R22', 'R23', 'R24', 'R25', 'R26', 'R27', 'R28', 'R29' ]
[ 'R30', 'R31', 'R32', 'R33', 'R34', 'R35', 'R36', 'R37', 'R38', 'R39' ]
[ 'R40', 'R41', 'R42', 'R43', 'R44', 'R45', 'R46', 'R47', 'R48', 'R49' ]
[ 'R50', 'R51', 'R52', 'R53', 'R54' ]

But with .delay(175) we'll emit batches of less than 10 items because we're limited by the 1s delay.

[ 'R0', 'R1', 'R2', 'R3', 'R4' ]
[ 'R5', 'R6', 'R7', 'R8', 'R9', 'R10' ]
[ 'R11', 'R12', 'R13', 'R14', 'R15' ]
[ 'R16', 'R17', 'R18', 'R19', 'R20', 'R21' ]
[ 'R22', 'R23', 'R24', 'R25', 'R26', 'R27' ]
[ 'R28', 'R29', 'R30', 'R31', 'R32' ]
[ 'R33', 'R34', 'R35', 'R36', 'R37', 'R38' ]
[ 'R39', 'R40', 'R41', 'R42', 'R43' ]
[ 'R44', 'R45', 'R46', 'R47', 'R48', 'R49' ]
[ 'R50', 'R51', 'R52', 'R53', 'R54' ]

There's however one difference to what you might need. This solution starts initially starts emitting values after 2s delay because of the .bufferTime(1000, ...) and delay(1000). All other emissions happen after 1s.

You could eventually use:

.bufferTime(1000, null, 10)
.mergeAll()
.bufferCount(10)

This will always collect 10 items and only after that it'll perform the request. This would be probably more efficient.

Peebles answered 17/2, 2017 at 11:2 Comment(0)
I
3

I wrote a library to do this, you set up the maximum number of requests per interval and it rate limits observables by delaying subscriptions. It's tested and with examples: https://github.com/ohjames/rxjs-ratelimiter

Ilene answered 11/6, 2017 at 0:36 Comment(0)
S
3

Go with Adam’s answer. However, bear in mind the traditional of().delay() will actually add a delay before every element. In particular, this will delay the first element of your observable, as well as any element that wasn’t actually rate limited.

Solution

You can work around this by having your concatMap return a stream of observables that immediately emit a value, but only complete after a given delay:

new Observable(sub => {
  sub.next(v);
  setTimeout(() => sub.complete(), delay);
})

This is kind of a mouthful, so I’d create a function for it. That said, since there’s no use for this outside of actual rate limiting, you’d probably be better served just writing a rateLimit operator:

function rateLimit<T>(
    delay: number,
    scheduler: SchedulerLike = asyncScheduler): MonoTypeOperatorFunction<T> {
  return concatMap(v => new Observable(sub => {
    sub.next(v);
    scheduler.schedule(() => sub.complete(), delay);
  }));
}

Then:

queueRequest$.pipe(
    rateLimit(100),
  ).subscribe(...);

Limitation

This will now create a delay after every element. This means that if your source observable emits its last value then completes, your resulting rate-limited observable will have a little delay between itself between its last value, and completing.

Strephon answered 29/7, 2019 at 14:38 Comment(1)
Would love to have the version that compiles when strictNullChecks is on.Marden
A
1

Updated cartant's answer as pipe-able operator for newer rxjs versions:

function rateLimit(count: number, period: number) {
  return <ValueType>(source: Observable<ValueType>) => {
    return source.pipe
      ( scan((records, value) => {
          let now = Date.now();
          let since = now - period;

          // Keep a record of all values received within the last period.
          records = records.filter((record) => record.until > since);
          if (records.length >= count) {
            // until is the time until which the value should be delayed.
            let firstRecord = records[0];
            let lastRecord = records[records.length - 1];
            let until = firstRecord.until + (period * Math.floor(records.length / count));

            // concatMap is used below to guarantee the values are emitted
            // in the same order in which they are received, so the delays
            // are cumulative. That means the actual delay is the difference
            // between the until times.
            records.push(
              { delay: (lastRecord.until < now) ?
                  (until - now) :
                  (until - lastRecord.until)
              , until
              , value });
          } else {
            records.push(
              { delay: 0
              , until: now
              , value });
          }

          return records;
        }, [] as RateLimitRecord<ValueType>[])
    , concatMap((records) => {
        let lastRecord = records[records.length - 1];
        let observable = of(lastRecord.value);
        return lastRecord.delay ? observable.pipe(delay(lastRecord.delay)) : observable;
      }) );
  };
}

interface RateLimitRecord<ValueType> {
  delay: number;
  until: number;
  value: ValueType;
}
Assassin answered 20/7, 2022 at 18:40 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.