RxJS throttle behavior; get first value immediately
Asked Answered
W

3

21

Example Plunkr: https://plnkr.co/edit/NZwb3ol8CbZFtSc6Q9zm?p=preview

I am aware that there are these 3 throttle methods for RxJS (5.0 beta.4):

auditTime(), throttleTime() and debounceTime()

The behavior I am looking for is the one lodash does by default on throttle:

    1. Give me the first value immediately!
    1. on consecutive values, hold values for the given delay, then emit last occurred value
    1. when throttle delay expired, go back to state (1)

In theory this should look like:

inputObservable
  .do(() => cancelPreviousRequest())
  .throttleTime(500)
  .subscribe((value) => doNextRequest(value))

But

  • throttleTime never gives me the last value, if emitted in the throttle timeout
  • debounceTime doesn't trigger immediately
  • auditTime doesn't trigger immediately

Could I combine any of the RxJS methods to achieve the described behavior?

Wolfe answered 10/5, 2016 at 18:16 Comment(0)
W
2

I took the auditTime operator and changed 2 lines to achieve the desired behavior.

New plunker: https://plnkr.co/edit/4NkXsOeJOSrLUP9WEtp0?p=preview

Original:

Changes:

from (auditTime):

protected _next(value: T): void {
  this.value = value;
  this.hasValue = true;
  if (!this.throttled) {
    this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
  }
}

clearThrottle(): void {
  const { value, hasValue, throttled } = this;
  if (throttled) {
    this.remove(throttled);
    this.throttled = null;
    throttled.unsubscribe();
  }
  if (hasValue) {
    this.value = null;
    this.hasValue = false;
    this.destination.next(value);
  }
}

to (auditTimeImmediate):

protected _next(value: T): void {
    this.value = value;
    this.hasValue = true;
    if (!this.throttled) {
        // change 1:
        this.clearThrottle();
    }
}

clearThrottle(): void {
    const { value, hasValue, throttled } = this;
    if (throttled) {
        this.remove(throttled);
        this.throttled = null;
        throttled.unsubscribe();
    }
    if (hasValue) {
        this.value = null;
        this.hasValue = false;
        this.destination.next(value);
        // change 2:
        this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
    }
}

So I start the timeout after the value was nexted.

Usage:

inputObservable
  .do(() => cancelPreviousRequest())
  .auditTimeImmediate(500)
  .subscribe((value) => doNextRequest(value))
Wolfe answered 11/5, 2016 at 7:23 Comment(1)
Is there a way to do this in the most recent version without changing the rxjs source?Erased
E
31

For anyone looking for this after 2018: This has been added over a year ago, but for some reason, the documentation has not been updated yet.

RxJS commit

You can simply pass a config object to throttleTime. The default is { leading: true, trailing: false }. To achieve the behavior discussed here you simply have to set trailing to true: { leading: true, trailing: true }

EDIT:

For completeness, here is a working snippet:

import { asyncScheduler } from 'rxjs'
import { throttleTime } from 'rxjs/operators'

...

observable.pipe(
  throttleTime(100, asyncScheduler, { leading: true, trailing: true })
)
Erased answered 8/10, 2018 at 17:6 Comment(3)
Worth noting if you want to throttle based on something other than time (eg ajax completion) - you can use throttle(() => Promise.resolve(...), { leading: true, trailing:true }) - I found this especially useful for running ajax puts immediately, waiting for completion then running the final put in a series when the first completes.Casual
Does this means that auditTime has become obsolete?Screak
Note that this has the potential disadvantage that you are now emitting the first of every time interval as well as the latest.Brant
C
3

For older RxJs, I wrote a concatLatest operator that does most of what you want. With it, you could get your throttling behavior with this code:

const delay = Rx.Observable.empty().delay(500);
inputObservable
    .map(value => Rx.Observable.of(value).concat(delay))
    .concatLatest()
    .subscribe(...);

Here's the operator. I took a stab at updating it to work with RxJS5:

Rx.Observable.prototype.concatLatest = function () {
    /// <summary>
    /// Concatenates an observable sequence of observable sequences, skipping sequences that arrive while the current sequence is being observed.
    /// If N new observables arrive while the current observable is being observed, the first N-1 new observables will be thrown
    /// away and only the Nth will be observed.
    /// </summary>
    /// <returns type="Rx.Observable"></returns>
    var source = this;

    return Rx.Observable.create(function (observer) {
        var latest,
            isStopped,
            isBusy,
            outerSubscription,
            innerSubscription,
            subscriptions = new Rx.Subscription(function () {
              if (outerSubscription) {
                outerSubscription.unsubscribe();
              }
              if (innerSubscription) {
                innerSubscription.unsubscribe();
              }
            }),
            onError = observer.error.bind(observer),
            onNext = observer.next.bind(observer),
            innerOnComplete = function () {
                var inner = latest;
                if (inner) {
                    latest = undefined;
                    if (innerSubscription) {
                      innerSubscription.unsubscribe();
                    }
                    innerSubscription = inner.subscribe(onNext, onError, innerOnComplete);
                }
                else {
                    isBusy = false;
                    if (isStopped) {
                        observer.complete();
                    }
                }
            };

        outerSubscription = source.subscribe(function (newInner) {
            if (isBusy) {
                latest = newInner;
            }
            else {
                isBusy = true;
                if (innerSubscription) {
                  innerSubscription.unsubscribe();
                }
                innerSubscription = newInner.subscribe(onNext, onError, innerOnComplete);
            }
        }, onError, function () {
            isStopped = true;
            if (!isBusy) {
                observer.complete();
            }
        });

        return subscriptions;
    });
};

And here's an updated plunkr: https://plnkr.co/edit/DSVmSPRijJwj9msefjRi?p=preview

Note I updated your lodash version to the latest version. In lodash 4.7, I rewrote the throttle/debounce operators to fix some edge case bugs. You were using 4.6.1 which still had some of those bugs, though I don't think they were affecting your test.

Cumulation answered 11/5, 2016 at 2:50 Comment(2)
Hi @Brandon, great answer. Can you provide link to plunkr with concatLatest operator using RxJS 4.x? (I wouldn't have asked since the question is specific to RxJS 5, but you said you already had it handy :)Margarito
@Margarito I wrote this version for RxJS 2. I assume it will work in 4 also: gist.github.com/bman654/92749cd93cdd84a540e41403f25a2105Cumulation
W
2

I took the auditTime operator and changed 2 lines to achieve the desired behavior.

New plunker: https://plnkr.co/edit/4NkXsOeJOSrLUP9WEtp0?p=preview

Original:

Changes:

from (auditTime):

protected _next(value: T): void {
  this.value = value;
  this.hasValue = true;
  if (!this.throttled) {
    this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
  }
}

clearThrottle(): void {
  const { value, hasValue, throttled } = this;
  if (throttled) {
    this.remove(throttled);
    this.throttled = null;
    throttled.unsubscribe();
  }
  if (hasValue) {
    this.value = null;
    this.hasValue = false;
    this.destination.next(value);
  }
}

to (auditTimeImmediate):

protected _next(value: T): void {
    this.value = value;
    this.hasValue = true;
    if (!this.throttled) {
        // change 1:
        this.clearThrottle();
    }
}

clearThrottle(): void {
    const { value, hasValue, throttled } = this;
    if (throttled) {
        this.remove(throttled);
        this.throttled = null;
        throttled.unsubscribe();
    }
    if (hasValue) {
        this.value = null;
        this.hasValue = false;
        this.destination.next(value);
        // change 2:
        this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
    }
}

So I start the timeout after the value was nexted.

Usage:

inputObservable
  .do(() => cancelPreviousRequest())
  .auditTimeImmediate(500)
  .subscribe((value) => doNextRequest(value))
Wolfe answered 11/5, 2016 at 7:23 Comment(1)
Is there a way to do this in the most recent version without changing the rxjs source?Erased

© 2022 - 2024 — McMap. All rights reserved.