How to join two event streams on a common event property?
Asked Answered
P

1

10

Consider the following two event streams. Every event has a timestamp/ts and value property.

Input stream

I want to combine these two streams where the events have the same timestamps, into a resulting stream with an applied transformation of the value. If one stream is missing one timestamp (such as the yellow ts=3 in the example below), that timestamp should be disregarded.

enter image description here

Would like to solve the problem using a reactive programming library such as xstream or rxjs. I'm quite new to the concepts of reactive programming, but if someone has another suggestion I'm all ears. Thanks!

Prosopopoeia answered 26/4, 2016 at 18:31 Comment(5)
I just love your diagrams!! About your issue, it is difficult in general, I mean you need to see the whole stream to do your matching, unless you can guarantee ordering (i.e. timestamp increasing function of time, which means no values come out of order). Otherwise it maybe could be done using the join operatorContinually
Thanks :) We can probably assume that timestamps always comes in increasing order.Prosopopoeia
also would be nice if all the values had different timestampsContinually
We can probably assume that too, all timestamps will be unique per input stream.Prosopopoeia
alright, got no time now to look at but I would try something with s1.join(s2, s1Val => s1.skipUntil(s2.value = s1Val), timer(0)) but that's just the back of my mind. The join function is pretty badly documented, but if you don't get any answer I can have a closer look.Continually
M
2

Just use combineLatest and pass only those combinations that have a matching timestamp. The other combinations are mapped to null, which you filter out later.

Here's the solution in xstream:

var streamOut = xs.combine(
  (a, b) => {
    if (a.ts === b.ts) {
      return {ts: a.ts, value: a.value + b.value};
    } else {
      return null;
    }
  },
  streamA, streamB
).filter(x => x !== null);

Check it running in JSBin: https://jsbin.com/saxawatuza/edit?js,console.

Mind answered 26/4, 2016 at 19:47 Comment(2)
a non-related comment, Andre, the OP used the xstream tag, which is unrelated to the xtream streaming library. Would be nice to have a specific tag for that.Continually
@Continually I agreePepito

© 2022 - 2024 — McMap. All rights reserved.