RxJS: debounce a stream only if distinct
Asked Answered
S

7

10

I want to debounce a stream - but only if the source value is the same as before. How would I do this with RxJS 5?

I do not want to emit a value if the value is the same and I emitted it previously within a specified time window. I should be able to use the value from the stream - or compare function similar to distinctUntilChanged.

Swum answered 14/6, 2017 at 22:36 Comment(2)
"I should be able to use the value from the stream - or compare function similar to distinctUntilChanged." This may be what you're talking about, but distinctUntilChanged(compare: function) allows you to define an arbitrary comparator where you could check the time between emissions (maybe even use the timestamp operator as well).Lippe
Similar but more broad term answer here: #53045481Equitant
T
5

I'm not aware of any way to do this with without creating your own operator because you need to maintain some sort of state (the last seen value).

One way looks something like this:

// I named this debounceDistinctUntilChanged but that might not be
// the best name. Name it whatever you think makes sense!

function debounceDistinctUntilChanged(delay) {
  const source$ = this;

  return new Observable(observer => {
    // Using an object as the default value
    // so that the first time we check it
    // if its the same its guaranteed to be false
    // because every object has a different identity.
    // Can't use null or undefined because source may
    // emit these!
    let lastSeen = {};

    return source$
      .debounce(value => {
        // If the last value has the same identity we'll
        // actually debounce
        if (value === lastSeen) {
          return Observable.timer(delay);
        } else {
          lastSeen = value;
          // This will complete() right away so we don't actually debounce/buffer
          // it at all
          return Observable.empty();
        }
      })
      .subscribe(observer);
  });
}

Now that you see an implementation you may (or may not) find it differs from your expectations. Your description actually left out certain details, like if it should only be the last value you keep during the debounce time frame or if it's a set--basically distinctUntilChanged vs. distinct. I assumed the later.

Either way hopefully this gives you a starting point and reveals how easy it is to create custom operators. The built in operators definitely do not provide solutions for everything as-is, so any sufficiently advanced app will need to make their own (or do the imperative stuff inline without abstracting it, which is fine too).

You can then use this operator by putting it on the Observable prototype:

Observable.prototype.debounceDistinctUntilChanged = debounceDistinctUntilChanged;

// later
source$
  .debounceDistinctUntilChanged(400)
  .subscribe(d => console.log(d));

Or by using let:

// later
source$
  .let(source$ => debounceDistinctUntilChanged.call($source, 400))
  .subscribe(d => console.log(d));

If you can, I recommend truly understanding what my code does, so that in the future you are able to easily make your own solutions.

Trixie answered 15/6, 2017 at 18:42 Comment(1)
There is a critical bug with this solution: if two values are rapidly fired and the first is different from the second, only the second value will be emitted!Anesthetize
H
12

It depends on what you're trying to do; I came upon this question when I was trying to do something similar, basically debouncing but with different debounces for different values of an object.

After trying the solution from jayphelps I couldn't get it to behave as I wanted. After much back and forth, turns out there is an in built easy way to do it: groupby.

const priceUpdates = [
  {bid: 10, id: 25},
  {bid: 20, id: 30},
  {bid: 11, id: 25},
  {bid: 21, id: 30},
  {bid: 25, id: 30}
];//emit each person
const source = Rx.Observable.from(priceUpdates);
//group by age
const example = source
  .groupBy(bid => bid.id)
  .mergeMap(group$ => group$.debounceTime(500))

const subscribe = example.subscribe(val => console.log(val));

Output:

[object Object] {
  bid: 11,
  id: 25
}
[object Object] {
  bid: 25,
  id: 30
}

Jsbin: http://jsbin.com/savahivege/edit?js,console

This code will group by the bid ID and debounce on that, so therefore only send the last values for each.

Hyperextension answered 4/11, 2017 at 12:29 Comment(4)
This works great, I'm using it with RxJS 6+ and NGRX actions!Childbirth
@DanielB can you please share your code in RxJS 6+ ?Lyn
@LukeKroon Yes, I posted an answer: https://mcmap.net/q/1046059/-rxjs-debounce-a-stream-only-if-distinctChildbirth
If the readers use case involves a long lived source and/or a lot of data, then be careful because groupBy will keep track of each ID it's ever seen, ever, potentially being a memory leak. It accepts an argument for a duration selector to mitigate this somewhat.Trixie
T
5

I'm not aware of any way to do this with without creating your own operator because you need to maintain some sort of state (the last seen value).

One way looks something like this:

// I named this debounceDistinctUntilChanged but that might not be
// the best name. Name it whatever you think makes sense!

function debounceDistinctUntilChanged(delay) {
  const source$ = this;

  return new Observable(observer => {
    // Using an object as the default value
    // so that the first time we check it
    // if its the same its guaranteed to be false
    // because every object has a different identity.
    // Can't use null or undefined because source may
    // emit these!
    let lastSeen = {};

    return source$
      .debounce(value => {
        // If the last value has the same identity we'll
        // actually debounce
        if (value === lastSeen) {
          return Observable.timer(delay);
        } else {
          lastSeen = value;
          // This will complete() right away so we don't actually debounce/buffer
          // it at all
          return Observable.empty();
        }
      })
      .subscribe(observer);
  });
}

Now that you see an implementation you may (or may not) find it differs from your expectations. Your description actually left out certain details, like if it should only be the last value you keep during the debounce time frame or if it's a set--basically distinctUntilChanged vs. distinct. I assumed the later.

Either way hopefully this gives you a starting point and reveals how easy it is to create custom operators. The built in operators definitely do not provide solutions for everything as-is, so any sufficiently advanced app will need to make their own (or do the imperative stuff inline without abstracting it, which is fine too).

You can then use this operator by putting it on the Observable prototype:

Observable.prototype.debounceDistinctUntilChanged = debounceDistinctUntilChanged;

// later
source$
  .debounceDistinctUntilChanged(400)
  .subscribe(d => console.log(d));

Or by using let:

// later
source$
  .let(source$ => debounceDistinctUntilChanged.call($source, 400))
  .subscribe(d => console.log(d));

If you can, I recommend truly understanding what my code does, so that in the future you are able to easily make your own solutions.

Trixie answered 15/6, 2017 at 18:42 Comment(1)
There is a critical bug with this solution: if two values are rapidly fired and the first is different from the second, only the second value will be emitted!Anesthetize
C
4

Providing an answer for RxJS 6+ with the method suggested by @samberic in an ngrx effect to group actions coming from a same source id with RxJS 6.

this.actions$.pipe(
    ofType(actionFoo, actionBar), // Two different ngrx action with an id property
    groupBy(action => action.id), // Group by the id from the source
    mergeMap(action => action.pipe(
        debounceTime(5000)
    ))
).pipe(
    // Do whatever it is that your effect is supposed to do!
)
Childbirth answered 28/10, 2020 at 13:19 Comment(1)
Here is another answer with similar results for reference https://mcmap.net/q/1161358/-group-and-debounce-observableLyn
G
3

Here is my RXJS 6+ version in typescript that works 100% as originally requested. Debounce (restart timer) on every new source value. Emit value only if the new value is different from the previous value or the debounce time has expired.

// custom rxjs operator to debounce while the source emits the same values
debounceDistinct<T>(delay: number) {
    return (source: Observable<T>): Observable<T> => {
        return new Observable(subscriber => {
            let hasValue = false;
            let lastValue: T | null = null;
            let durationSub: Subscription = null;

            const emit = () => {
                durationSub?.unsubscribe();
                durationSub = null;
                if (hasValue) {
                    // We have a value! Free up memory first, then emit the value.
                    hasValue = false;
                    const value = lastValue!;
                    lastValue = null;
                    subscriber.next(value);
                }
            };

            return source.subscribe(
                (value: T) => {
                    // new value received cancel timer
                    durationSub?.unsubscribe();
                    // emit lastValue if the value has changed
                    if (hasValue && value !== lastValue) {
                        const value = lastValue!;
                        subscriber.next(value);
                    }
                    hasValue = true;
                    lastValue = value;
                    // restart timer
                    durationSub = timer(delay).subscribe(() => {
                        emit();
                    });
                },
                error => {
                },
                () => {
                    emit();
                    subscriber.complete();
                    lastValue = null;
                });
        });
    }
}
Galvan answered 18/2, 2021 at 9:51 Comment(1)
This is the correct answer, kudos!Anesthetize
B
3

Another possibility, not sure if supported with rxjs5 though:

source$.pipe(
  pairwise(),
  debounce(([a, b]) => {
    if (a === b) {
      return interval(1000)
    }
    return of();
  }),
  map(([a,b]) => b)
)
.subscribe(console.log);

https://stackblitz.com/edit/typescript-39nq7f?file=index.ts&devtoolsheight=50

Boor answered 2/4, 2021 at 12:45 Comment(0)
S
0

update for rxjs 6 :

 source$
.pipe(
        // debounceTime(300),  optionally un-comment this to add debounce
        distinctUntilChanged(),
    )
.subscribe(v => console.log(v))
Sustentation answered 28/8, 2018 at 10:13 Comment(2)
Hello Nir, an answer that is relevant for rxjs 6 would indeed be helpful. However, this code will debounce all the time. I'm looking for an approach that will only debounce if the value has not changed, if it has changed - I don't want it to debounce.Swum
you're right, I've edited the answer to better reflect the original questionSustentation
P
0

This rxjs6+ operator will emit when the source 'value' has changed or when some 'delay' time has passed since last emit (even if 'value' has not changed):

export function throttleUntilChanged(delay: number) {
  return (source: Observable<any>) => {
    return new Observable(observer => {

      let lastSeen = {};
      let lastSeenTime = 0;

      return source
        .pipe(
          flatMap((value: any) => {
            const now = Date.now();
            if (value === lastSeen && (now - lastSeenTime) < delay ) {
              return empty();
            } else {
              lastSeen = value;
              lastSeenTime = now;
              return of(value);
            }
          })
        )
        .subscribe(observer);
    });
  };
}
Plank answered 12/2, 2019 at 12:25 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.