Conditional emission delays with rxjs
Asked Answered
T

4

6

a-thousand-words

From picture to code?

How to get the Out observable from Data and Gates?

  • Data is an observable of any kind e.g. JSON objects to be sent to a remote backend
  • Gates is a boolean observable, where the ticks correspond to true and the crosses to false. For example, Internet connectivity whereby true means the network became accessible and false reflects a disconnection.
  • Out is the resulting observable, which emits the same as Data, sometimes immediately, sometimes with a delay, depending on the gate that preceded. For instance, I could subscribe to the Out in order to post the emitted JSON objects to a remote API while connected to the Internet.
Takeover answered 11/5, 2017 at 10:28 Comment(7)
So what exectly is your questions. How to implement Gates?Stgermain
@Stgermain How to get the Out observable from Data and Gates?Takeover
you can use something call rxmarbles.com/#withLatestFromTamera
Have a look at bufferWhen, bufferToggle or combineLatestBoracite
to me your diagram looks like filter where gates is a dependency something externalStgermain
Some of your suggestions do not exist in rxjs v5 any more and others do not work for what I'm looking for as far as I can see.. or prove me wrong with code :)Takeover
I'll try to come up with something later today but if I don't, it's because I couldn't find a better answer than user3743222 :)Teferi
T
1

Inspired by contributions to this post, the following seems to yield the desired behaviors:

const ticks$ = gates$.filter(b => b)
const crosses$ = gates$.filter(b => !b)
const tickedData$ = data$.windowToggle(ticks$, _ => crosses$.take(1)).switch()
const crossedDataBuffers$ = data$.bufferToggle(crosses$, _ => ticks$.take(1))
const crossedData$ = Rx.Observable.from(crossedDataBuffers$)
const out$ = tickedData$.merge(crossedData$)

It could possibly be made simpler, have a play at https://jsfiddle.net/KristjanLaane/6kbgnp41/

Takeover answered 12/5, 2017 at 13:46 Comment(0)
P
3

Another way to conditionally delay data$ is to use delayWhen() such:

const gate$ = new BehaviorSubject<boolean>(false);
const triggerF = _ => gate$.pipe(filter(v => v));
const out$ = data$
  .pipe(delayWhen(triggerF))              
  .subscribe( (v) => console.log(v));

// then trigger gate$, for instance:
setTimeout(() => gate$.next(true), 5000);
setTimeout(() => gate$.next(false), 10000);
Pronty answered 5/2, 2019 at 21:29 Comment(0)
A
2

For what I understand, you need data$ when gates$ emits true, and buffering of data$ otherwise, ending when gates$ emits true again, so sth like :

out$ = gates$.switchMap(x => x? data$ : data$.buffer(gates$))

Hypothesis : data$, gates$ are hot streams (cf. what that means here Hot and Cold observables : are there 'hot' and 'cold' operators?).

This is not tested, but try it, and let us know if it indeed worked (or prove it with code as you say :-). The logic looks ok, I am just unsure about the re-entrant gates$. Hopefully the inner gates$ suscription from buffer fires before the outer one. If that does not happen you will see a pause in the emission of data corresponding to network downtime.

Alright, if that does not work, then the standard solution with scan will. The behavior which you seek can be expressed as a (tiny) state machine, with two states : passthrough and buffering. You can implement all such state machines with scan.

Here goes scan solution : https://jsfiddle.net/1znvwyzc/

const gates$ = Rx.Observable.interval(2000)
                            .map(_ => Math.random() >= 0.5)
                            .map(x => ({gates: x}))
                            .share()

const data$ = Rx.Observable.interval(500)
                           .map(_ => "data"+ _)
                           .map(x => ({data: x}))                           
                           .share()

const out$ = Rx.Observable.merge(gates$, data$).scan((acc, val) => {
  if (acc.controlState === 'passthrough'){
    if (Object.keys(val).includes('data')) {
      return {
        controlState : 'passthrough',
        bufferedData : [],
        out : val.data
      }
    }
    if (Object.keys(val).includes('gates')) {
      if (val.gates) {
        // gates passing from true to true -> no changes to perform
        return {
        controlState : 'passthrough',
        bufferedData : [],
        out : null
        }
      } else {
        // gates passing from true to false, switch control state
        return {
        controlState : 'buffered',
        bufferedData : [],
        out : null        
        }
      }      
    }
  }
  if (acc.controlState === 'buffered'){
    if (Object.keys(val).includes('data')) {
      return {
        controlState : 'buffered',
        bufferedData : (acc.bufferedData.push(val.data), acc.bufferedData),
        out : null              
      }
    }
    if (Object.keys(val).includes('gates')) {
      if (val.gates) {
        // gates from false to true -> switch control state and pass the buffered data
        return {
          controlState : 'passthrough',
          bufferedData : [],
          out : acc.bufferedData              
        }
      } else {
        // gates from false to false -> nothing to do
        return {
          controlState : 'buffered',
          bufferedData : acc.bufferedData,
          out : null                    
        }
      }
    }
  }
}, {controlState : 'passthrough', bufferedData : [], out:null})
.filter(x => x.out)
.flatMap(x => Array.isArray(x.out) ? Rx.Observable.from(x.out) : Rx.Observable.of(x.out))

out$.subscribe(_ => console.log(_))   

You can see the exact same technique used here : How do I conditionally buffer key input based on event in RxJs

Arrant answered 11/5, 2017 at 13:45 Comment(4)
I find this question very interesting and this is the best answer so far :) One thing tho, I think at the end in the buffer you should pass gates$.distinctUntilChanged() because if there's an emission of the same value it would break the buffer (I guess).Teferi
yeah you are right, so to make it simpler, I am going to suppose that gates has already been applied dinstinctUntilChangedArrant
the switchmap & buffer solution does not seem to fit the bill unfortunately - I don't see the missed data being emitted after a tick - tested with jsfiddle.net/KristjanLaane/77qL0f1nTakeover
then check the included solution with scan thenArrant
T
1

Inspired by contributions to this post, the following seems to yield the desired behaviors:

const ticks$ = gates$.filter(b => b)
const crosses$ = gates$.filter(b => !b)
const tickedData$ = data$.windowToggle(ticks$, _ => crosses$.take(1)).switch()
const crossedDataBuffers$ = data$.bufferToggle(crosses$, _ => ticks$.take(1))
const crossedData$ = Rx.Observable.from(crossedDataBuffers$)
const out$ = tickedData$.merge(crossedData$)

It could possibly be made simpler, have a play at https://jsfiddle.net/KristjanLaane/6kbgnp41/

Takeover answered 12/5, 2017 at 13:46 Comment(0)
F
0
const gate$ = Rx.Observable.interval(2000)
                           .map(_ => Math.random() >= 0.5)
                           .filter(_ => _)


const data$ = Rx.Observable.interval(500)
                            .map(_ => "data"+ _)
                            .buffer(gate$)
                            .flatMap(_ => Rx.Observable.from(_))

data$.subscribe(_ => console.log(_))                          

The gate stream produces random true and false values(eg n/w is up or down). We emit only the true vales from this stream

Based on the truthy values from this stream we buffer our data stream.

see fiddle - fiddle. Don't forget to open the browser console :)

Fecundate answered 11/5, 2017 at 11:20 Comment(1)
I do not think this corresponds to the marble I'm after. One of the key differences is that with your code (1) gets emitted in Out after the second tick in Gates, but (1) should be emitted in Out immediately..Takeover

© 2022 - 2024 — McMap. All rights reserved.