DebounceTime after first value in RxJS
Asked Answered
T

4

6

I need a specific behavior that I can't get with the RxJS operators. The closest would be to use DebounceTime only for values entered after the first one, but I can't find a way to do it. I have also tried with ThrottleTime but it is not exactly what I am looking for, since it launches intermediate calls, and I only want one at the beginning that is instantaneous, and another at the end, nothing else.

ThrottleTime

throttleTime(12 ticks, { leading: true, trailing: true })

source:             --0--1-----2--3----4--5-6---7------------8-------9---------
throttle interval:  --[~~~~~~~~~~~I~~~~~~~~~~~I~~~~~~~~~~~I~~~~~~~~~~~]--------
output:             --0-----------3-----------6-----------7-----------9--------


source_2:           --0--------1------------------2--------------3---4---------
throttle interval:  --[~~~~~~~~~~~I~~~~~~~~~~~]---[~~~~~~~~~~~]--[~~~~~~~~~~~I~
output_2:           --0-----------1---------------2--------------3-----------4-

DebounceTime

debounceTime(500)

source:             --0--1--------3------------4-5-6-7-8-9-10-11--13----------------
debounce_interval:  -----[~~~~~]--[~~~~~]--------------------------[~~~~~]----------
output:             -----------1--------3--------------------------------13---------

What I want

debounceTimeAfterFirst(500) (?)

source:             --0--1--------3------------4-5-6-7-8-9-10-11--13----------------
debounce_interval:  -----[~~~~~]--[~~~~~]--------------------------[~~~~~]----------
output:             --0--------1--3------------4-------------------------13---------

As you see, the debounce time is activated when a new value is entered. If the debounce time passes and any new value has been entered, it stops the listening the debounceTime action and waits to start a new one.

Edit: I forgot to comment that this must be integrated with NgRx’s Effects, so it must be a continuous stream that mustn't be completed. Terminating it would probably cause it to stop listening for dispatched actions.

Turbojet answered 7/2, 2022 at 10:2 Comment(2)
So, in your example, why do 0 and 4 not start debounce_interval, but 1, 3, an 13 do? What's happening with 5-11?Weatherwise
It's the normal behaviour for DebounceTime rxjs.dev/api/operators/debounceTimeTurbojet
K
6

I would use a throttle combined with a debounceTime:

  • throttle: from Documentation Emit value on the leading edge of an interval, but suppress new values until durationSelector has completed.

  • debounceTime: from Documentation Discard emitted values that take less than the specified time between output.

I would use a throttle stream to get the raising edge (the first emission) and then the debounce stream would give us the falling edge.

const source = fromEvent(document.getElementsByTagName('input'), 'keyup').pipe(
  pluck('target', 'value')
);

const debounced = source.pipe(
  debounceTime(4000),
  map((v) => `[d] ${v}`)
);

const effect = merge(
  source.pipe(
    throttle((val) => debounced),
    map((v) => `[t] ${v}`)
  ),
  debounced
);

effect.subscribe(console.log);

See RxJS StackBlitz with the console open to see the values changing.

I prepared the setup to adapt it to NgRx which you mention. The effect I got working is:

@Injectable({ providedIn: 'root' })
export class FooEffects {
  switchLight$ = createEffect(() => {
    const source = this.actions$.pipe(
      ofType('[App] Switch Light'),
      pluck('onOrOff'),
      share()
    );
    const debounced = source.pipe(debounceTime(1000), share());
    return merge(source.pipe(throttle((val) => debounced)), debounced).pipe(
      map((onOrOff) => SetLightStatus({ onOrOff }))
    );
  });

  constructor(private actions$: Actions) {}
}

See NgRx StackBlitz with the proposed solution working in the context of an Angular NgRx application.

  • share: This operator prevents the downstream paths to simultaneously fetch the data from all the way up the chain, instead they grab it from the point where you place share.

I also tried to adapt @martin's connect() approach. But I don't know how @martin would "reset" the system so that after a long time if a new source value is emitted would not debounce it just in the same manner as you first run it, @martin, feel free to fork it and tweak it to make it work, I'm curious about your approach, which is very smart. I didn't know about connect().

@avicarpio give it a go on your application and let us know how it goes :)

Kathrinkathrine answered 7/2, 2022 at 19:33 Comment(0)
A
0

I think you could do it like the following, even though I can't think of any easier solution right now (I'm assuming you're using RxJS 7+ with connect() operator):

connect(shared$ => shared$.pipe(
  exhaustMap(value => merge(
    of(value),
    shared$.pipe(debounceTime(1000)),
  ).pipe(
    take(2),
  )),
)),

Live demo: https://stackblitz.com/edit/rxjs-qwoesj?devtoolsheight=60&file=index.ts

connect() will share the source Observable and lets you reuse it in its project function multiple times. I'm using it only to use the source Observable inside another chain.

exhaustMap() will ignore all next notifications until its inner Observable completes. In this case the inner Observable will immediately reemit the current value (of(value)) and then use debounceTime(). Any subsequent emission from source is ignored by exhaustMap() because the inner Observable hasn't completed yet but is also passed to debounceTime(). Then take(2) is used to complete the chain after debounceTime() emits and the whole process can repeat when source emits because exhaustMap() won't ignore the next notification (its inner Observable has completed).

Adsorbent answered 7/2, 2022 at 10:45 Comment(2)
Sorry, I forgot to say that this needs to be inside an effect, so I can't make this works in my code. I edited the main thread with that info. Thanks for the reply anyway, and if you come up with anything I'll be glad to read it :)Turbojet
Why you can't use it in an effect? The outer chain doesn't get completed. You can see that in the demo link. If it did complete then it wouldn't work repeatedly but you can see it does.Adsorbent
W
0

Here's a custom operator that (as far s I can tell) does what you're after.

The two key insights here are:

  1. Use connect so that you can subscribe to the source twice, once to ignore emissions with exhaustMap and another to inspect and debounce emissions with switchMap
  2. Create an internal token so that you know when to exit without a debounced emission. (Insures that from your example above, the 4 is still emitted).
function throttleDebounceTime<T>(interval: number): MonoTypeOperatorFunction<T> {
  // Use this token's memory address as a nominal token
  const resetToken = {};

  return connect(s$ => s$.pipe(
    exhaustMap(a => s$.pipe(
      startWith(resetToken),
      switchMap(b => timer(interval).pipe(mapTo(b))),
      take(1),
      filter<T>(c => c !== resetToken),
      startWith(a)
    ))
  ));
}

example:

of(1,2,3,4).pipe(
  throttleDebounceTime(500)
).subscribe(console.log);

// 1 [...0.5s wait] 4
Weatherwise answered 7/2, 2022 at 16:59 Comment(0)
W
0

this is the operator i wrote:

export function throttleDebounce<T>(time: number): OperatorFunction<T, T> {
  return (source: Observable<T>): Observable<T> => {
    const debounced = source.pipe(debounceTime(time), share());

    return merge(source.pipe(throttle(() => debounced)), debounced).pipe(map((value) => value));
  };
}
Wilser answered 28/2 at 9:22 Comment(1)
Thank you for contributing to the Stack Overflow community. This may be a correct answer, but it’d be really useful to provide additional explanation of your code so developers can understand your reasoning. This is especially useful for new developers who aren’t as familiar with the syntax or struggling to understand the concepts. Would you kindly edit your answer to include additional details for the benefit of the community?Oniskey

© 2022 - 2024 — McMap. All rights reserved.