How to do distinct throttle in RxJS
Asked Answered
A

2

5

I have spent few days but could not find a way to do "distinct throttle" in RxJS.

Assume each event completes in 4 dashes, a "distinct throttle" will perform as follows:

-①-②-①---------①-----|->

[distinct throttle]

-①-②-------------①-----|->

How can I use existing RxJS operators to build a "distinct throttle"?

Anvers answered 6/7, 2018 at 21:59 Comment(0)
B
8

You can use groupBy to separate the notifications by value and can then apply throttleTime and can then merge the grouped observables using mergeMap. Like this:

const { Subject } = rxjs;
const { groupBy, mergeMap, throttleTime } = rxjs.operators;

const source = new Subject();
const result = source.pipe(
  groupBy(value => value),
  mergeMap(grouped => grouped.pipe(
    throttleTime(400)
  ))
);

result.subscribe(value => console.log(value));

setTimeout(() => source.next(1), 100);
setTimeout(() => source.next(2), 300);
setTimeout(() => source.next(1), 400);
setTimeout(() => source.next(1), 900);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
Balladry answered 6/7, 2018 at 23:10 Comment(1)
Thank you, that's exactly what I want!Anvers
P
0

distinct and throttle have 2 different characteristics regarding item pick. distinct will pick the first item while throttle will pick the last.

Sometimes you want to keep throttle's behavior.

Let's say the stream is: chat-message-edit events carrying the updated text. A user may edit a specific message multiple times within the throttle period.
You want to be sure that you always keep the last version of each message (among a stream of edits of differrent messages ).

A possible solution I would follow for this is the one below

const source$ = from([
{id:1,content:"1a"}, 
{id:1,content:"1b"}, 
{id:1,content:"1c"}, 
{id:2,content:"2a"}, 
{id:2,content:"2b"}, 
{id:3,content:"3a"},
{id:3,content:"3b"},
{id:1,content:"1d"},
{id:1,content:"1e"},
{id:4,content:"4a"},
{id:4,content:"4b"},
{id:4,content:"4c"},
{id:4,content:"4e"},
{id:4,content:"4f"},
{id:3,content:"3c"},
{id:3,content:"3d"},
{id:3,content:"3e"}
]).pipe(concatMap((el)=> of(el).pipe(delay(500)) ));



const distinctThrottle = (throttleTime, keySelector)=>
    pipe(bufferTime(throttleTime),
          concatMap((arr)=>from(arr.reverse()).pipe(distinct(keySelector))  
        )) ;


let throttledStream = source$.pipe(distinctThrottle(1550, ({id})=>id));

throttledStream.subscribe(console.log);
Pianola answered 3/12, 2019 at 13:6 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.