RxJS throttle same value but let new values through
Asked Answered
H

5

8

"Here you have", someone says and you are given this input stream of values that you somewhat want to do distinctUntilChanged() upon...

Input:  '1-1----11---2--1122----1---2---2-2-1-2---|'
Output: '1-----------2--1-2-----1---2-------1-2---|'

Nothing weird so far,
But now someone says "it's okey" if the same value comes again, "but only if it's not to soon!". I want at least '----' ticks between the same value. "Okey" you say and you add a throttle

const source = new Subject<number>();

// mysterious cave troll is randomly source.next(oneOrTwo)

const example = source.pipe(throttle(val => interval(4000)));

Input:  '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'

"That's not what I want! Look at all the value you missed", referring to that you throttle in regards to all values being streamed.

Input:  '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'
        '-------------->1<--------->2<----->1<------|' <-- Missed values

"Here, let me show show you" the mysterious man says and gives you this

Wanted output

Input:  '1-1----11---2--1112----1---2---2-2-1-2-----|'
Output: '1------1----2--1--2----1---2-----2-1-------|'

My answer to this is that it feels like a combined window wouldn't do.

From someone more experienced,
is this a hard problem to solve? (or have I missed an obvious solution)

Hamid answered 5/12, 2018 at 0:1 Comment(1)
in your Wanted output, the last '2' input should also be in the outputPhyle
H
0

I found a solution that works, does someone have any take on this?

source.pipe(
   windowTime(4000),
   concatMap(obs => obs.pipe(distinct()))
);

Examples from before, in a StackBlitz example

UPDATE: this does not actually work 100%. It only take the current window into consideration. So you can for example have

`[1-12][2---]` which would give `1--22---|`

where [----] would represent the time window. In other words, if a value is first emitted last in one window and emitted first in the next window, the same value will pass through right after each other.

Thanks @eric99 for making me realize this.

Hamid answered 5/12, 2018 at 9:57 Comment(3)
Thats similar to my first attempt except I used bufferTime() and mergeMap(). @FanCheung pointed out an error, so I deleted the answer.Shiftless
The Stackblitz uses keypress as source, would be better to define precise timing perhaps with a Subject and setTimeout to feed subject.onNext(). I have been looking for something to take the marble diagram to define the source.Shiftless
One thing this question shows, when complexity goes up it is really tricky to get every detail correct without a solid test in place.Shiftless
A
7

First I came up with idea to somehow combine distinctUntilChanged() and throttleTimte(), however it was not possible for me to come up with solution and then I tried something else.

The operator I came up with is throttleDistinct() that works as you would like to: StackBlit Editor Link

It has 2 parameters which are:

  1. duration: number which is in milliseconds and is similar to duration in throttleTime(duration: number)
  2. equals: (a: T, b: T) => boolean which is function to compare if previous item is equal to next item, which has default implementation of (a, b) => a === b

import { of, fromEvent, interval, Observable } from 'rxjs';
import { map, scan, filter, } from 'rxjs/operators';

const source = fromEvent(document, 'keypress')
  .pipe(map((x: any) => x.keyCode as number))

source
  .pipe(
    throttleDistinct(1000),
  )
  .subscribe((x) => console.log('__subscribe__', x));

export function throttleDistinct<T>(
  duration: number,
  equals: (a: T, b: T) => boolean = (a, b) => a === b
) {
  return (source: Observable<T>) => {
    return source
      .pipe(
        map((x) => {
          const obj = { val: x, time: Date.now(), keep: true };
          return obj;
        }),
        scan((acc, cur) => {
          const diff = cur.time - acc.time;

          const isSame = equals(acc.val, cur.val)
          return diff > duration || (diff < duration && !isSame)
            ? { ...cur, keep: true }
            : { ...acc, keep: false };
        }),
        filter((x) => x.keep),
        map((x) => x.val),
      )
  }
}
Acclivity answered 5/12, 2018 at 7:24 Comment(3)
+1, that's a really neat solution I would never have think of doing. I found a solution to another problem which used a combined windowTime and concatMap like .pipe( // throttleDistinct(1000), windowTime(1000), concatMap(obs => obs.pipe(distinctUntilChanged())) ) which I think accomplish the same thing. However. Both solutions do not meet requirements because it will produce and output of `|1212121----| which is not allowed.Hamid
@Hamid Hey, I guess that when the input is : |1212121----| your desired output should be just this : |12---------|. Is my assumption right? Because your description in the question doesn't really give me this idea.Acclivity
if you have |1212121----| then the output should be |12----1----|. There must be at least ---- between each unique value, but new values must also be let through. If you have |1212112----| then the output is |12---12----|.Hamid
S
1

Off the top of my head, you want to buffer by the time interval, then distinct within each buffer.

Effectively you want to restart / reboot the distinct run every n milliseconds.

source.pipe(
  bufferTime(ms),
  mergeMap(bufferArray => from(bufferArray).pipe(distinctUntilChanged()) )
)
Shiftless answered 5/12, 2018 at 2:50 Comment(8)
miss identical values from buffer time.Appendix
Comment too short to understandShiftless
when there's identical value emitted between throttle it should go through too. you code only emit distinct value.Appendix
Not so - and there is no throttle.Shiftless
your code only emit distinct value. look at the desire output Output: |1------1----2--1--2----1---2-----2-1-------|Appendix
there requirement is more complicated than your answer, when there is a new distinct value changes it emits, when throttle time reach it also emit.Appendix
I'll put in a runnable snippet to illustrate. Your meaning is still not clear.Shiftless
@FanCheung you are right, it doesn't work. The given output looks wrong too, should be 1------1----2--1-2-----1---2---2---1-2----- if the rules are followed exactly, specifically two of the '2' can come a little earlier than shown.Shiftless
S
1

This is my second attempt, it filters the stream by output (rather than taking distinctUntil) then throttles and merges the two streams.

Of course, we may not have a known set of values (1,2,...n).
If I can figure out that wrinkle, will add a further example.

const output = merge(
  source.pipe( filter(x => x === 1), throttle(val => interval(ms))),
  source.pipe( filter(x => x === 2), throttle(val => interval(ms)))
)

Here is my check (ms = 4000)

input         1-1----11---2--1112----1---2---2-2-1-2-----
expected      1------1----2--1--2----1---2-----2-1-------

filter(1)     1-1----11------111-----1-----------1-------
throttle(1)   1------1-------1-------1-----------1-------

filter(2)     ------------2-----2--------2---2-2---2-----
throttle(2)   ------------2-----2--------2-----2---------

merged        1------1----2--1--2----1---2-----2-1-------
expected      1------1----2--1--2----1---2-----2-1-------

Extending to n values

I think this will work where the set of values in the stream is not known in advance (or has a large range so extending the previous answer is impractical).

It should work as long as the source completes.

merge(
  source.pipe(
    distinct().pipe(
      mapTo(distinctVal => source.pipe( 
        filter(val = val === distinctVal), 
        throttle(val => interval(ms))
      )
    )  
  )
)

I don't have a proof yet, will post that next.

Shiftless answered 5/12, 2018 at 5:23 Comment(5)
+1, your first example is working towards the requirements, I think you just forgot (x => x === 1) in the filter operator. I will test out the other example with range of values to see how it works. Quite inspiring though to see all these kind of attempt, effort and different ways of thinking. Nice job!Hamid
@erhise, please see this Stackblitz. There may be some flaws in there, but feel free to fork and change it.Shiftless
@FanCheung, please see this Stackblitz. There may be some flaws in there, but feel free to fork and change it.Shiftless
@erhise, I forgot to change '4000' to timeMarbles in your test, now it looks a lot better.Shiftless
Actually, it looks pretty much like the output from my first attempt using bufferTime()Shiftless
A
0

Here is a tricky solution base on theory of operators, but I can't sure it really works, because I will need to mock a source emission first.

So throttle and distinct stream always have the latest value cached, zip make sure they always got emitted in pair, zip will always emit when any of the stream emit because it's shareReplay(1).

We always take the value emit from distinctStream, even when zip stream is trigger by throttle, because distinctStream always have the last cached value.

const throttleStream= source.pipe(throttle(val => interval(4000)),shareReplay(1))
const distinctStream= source.pipe(distinctUntilChanged(),shareReplay(1))
zip(throttleStream,distinctStream).pipe(
   map((t,d)=>d)
)
Appendix answered 5/12, 2018 at 9:22 Comment(8)
I ran marbles for each step, may be wrong but I get 1-----------2--1--2----1-------2-----1-----. I don't really know what shareReplay(1) does here, so maybe it does work.Shiftless
Would you be kind enough to add a marble diagram for throttleStream and distinctStream?Shiftless
Sharereplay will always emit the last emit value, so zip will always emit both valuesAppendix
How does the output of source.pipe(throttle(val => interval(4000)),shareReplay(1)) differ from the output of source.pipe(throttle(val => interval(4000)))?Shiftless
When u subscribe to it you get a value immediately with sharereply. It is like turning an observable into behavior subjectAppendix
Ok, I think I follow. In our test example emits '1' immediately, so for this scenario no difference in output?Shiftless
I will try to mock ur stream and test it outAppendix
Cheers. It is quite hard to find an online Rx playground that takes marbles for input. I ended up just manually working out the diagram at each step.Shiftless
H
0

I found a solution that works, does someone have any take on this?

source.pipe(
   windowTime(4000),
   concatMap(obs => obs.pipe(distinct()))
);

Examples from before, in a StackBlitz example

UPDATE: this does not actually work 100%. It only take the current window into consideration. So you can for example have

`[1-12][2---]` which would give `1--22---|`

where [----] would represent the time window. In other words, if a value is first emitted last in one window and emitted first in the next window, the same value will pass through right after each other.

Thanks @eric99 for making me realize this.

Hamid answered 5/12, 2018 at 9:57 Comment(3)
Thats similar to my first attempt except I used bufferTime() and mergeMap(). @FanCheung pointed out an error, so I deleted the answer.Shiftless
The Stackblitz uses keypress as source, would be better to define precise timing perhaps with a Subject and setTimeout to feed subject.onNext(). I have been looking for something to take the marble diagram to define the source.Shiftless
One thing this question shows, when complexity goes up it is really tricky to get every detail correct without a solid test in place.Shiftless

© 2022 - 2024 — McMap. All rights reserved.