For older RxJs, I wrote a concatLatest
operator that does most of what you want. With it, you could get your throttling behavior with this code:
const delay = Rx.Observable.empty().delay(500);
inputObservable
.map(value => Rx.Observable.of(value).concat(delay))
.concatLatest()
.subscribe(...);
Here's the operator. I took a stab at updating it to work with RxJS5:
Rx.Observable.prototype.concatLatest = function () {
/// <summary>
/// Concatenates an observable sequence of observable sequences, skipping sequences that arrive while the current sequence is being observed.
/// If N new observables arrive while the current observable is being observed, the first N-1 new observables will be thrown
/// away and only the Nth will be observed.
/// </summary>
/// <returns type="Rx.Observable"></returns>
var source = this;
return Rx.Observable.create(function (observer) {
var latest,
isStopped,
isBusy,
outerSubscription,
innerSubscription,
subscriptions = new Rx.Subscription(function () {
if (outerSubscription) {
outerSubscription.unsubscribe();
}
if (innerSubscription) {
innerSubscription.unsubscribe();
}
}),
onError = observer.error.bind(observer),
onNext = observer.next.bind(observer),
innerOnComplete = function () {
var inner = latest;
if (inner) {
latest = undefined;
if (innerSubscription) {
innerSubscription.unsubscribe();
}
innerSubscription = inner.subscribe(onNext, onError, innerOnComplete);
}
else {
isBusy = false;
if (isStopped) {
observer.complete();
}
}
};
outerSubscription = source.subscribe(function (newInner) {
if (isBusy) {
latest = newInner;
}
else {
isBusy = true;
if (innerSubscription) {
innerSubscription.unsubscribe();
}
innerSubscription = newInner.subscribe(onNext, onError, innerOnComplete);
}
}, onError, function () {
isStopped = true;
if (!isBusy) {
observer.complete();
}
});
return subscriptions;
});
};
And here's an updated plunkr: https://plnkr.co/edit/DSVmSPRijJwj9msefjRi?p=preview
Note I updated your lodash version to the latest version. In lodash 4.7, I rewrote the throttle/debounce operators to fix some edge case bugs. You were using 4.6.1 which still had some of those bugs, though I don't think they were affecting your test.