How to interleave streams (with backpressure)
Asked Answered
S

2

11

Suppose I have two possibly infinite streams:

s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...

I want to merge the streams and then map merged stream with slowish asynchronous operation (e.g. in Bacon with fromPromise and flatMapConcat).

I can combine them with merge:

me = a12b3.c45d6.7e...

And then map

s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...
me = a12b3.c45d6.7e...
mm = a..1..2..b..3..c..4..5..

As you see greedier s2 streams gets advantage in the long run. This is undesired behaviour.


The merge behaviour is not ok, as I want to have some kind of backpressure to have more interleaved, "fair", "round-robin" merge. Few examples of desired behaviour:

s1 = a.....b..............c...
s2 = ..1.2.3..................
mm = a...1...b...2...3....c...

s1 = a.........b..........c...
s2 = ..1.2.3..................
mm = a...1...2...b...3....c...

One way to think this is that s1 and s2 send tasks to the worker which can handle only one task at the time. With merge and flatMapConcat I'll get a greedy task manager, but I want more fair one.


I'd like to find a simple and elegant solution. Would be nice if it is easily generalisable for arbitrary amount of streams:

// roundRobinPromiseMap(streams: [Stream a], f: a -> Promise b): Stream b
var mm = roundRobinPromiseMap([s1, s2], slowAsyncFunc);

Solution using RxJS or other Rx library is fine too.


Clarifications

Not zipAsArray

I don't want:

function roundRobinPromiseMap(streams, f) {
  return Bacon.zipAsArray.apply(null, streams)
    .flatMap(Bacon.fromArray)
    .flatMapConcat(function (x) {
      return Bacon.fromPromise(f(x));
    });
}

Compare the example marble diagram:

s1  = a.....b..............c.......
s2  = ..1.2.3......................
mm  = a...1...b...2...3....c....... // wanted
zip = a...1...b...2........c...3... // zipAsArray based

Yes I'll run into buffering issues

... but so will I with straightforward unfair one:

function greedyPromiseMap(streams, f) {
  Bacon.mergeAll(streams).flatMapConcat(function (x) {
    return Bacon.fromPromise(f(x));
  });
}

Marble diagram

s1    = a.........b..........c...
s2    = ..1.2.3..................
mm    = a...1...2...b...3....c...
merge = a...1...2...3...b....c...
Scholastic answered 30/12, 2014 at 14:35 Comment(5)
RxJS has things like bufferWithTimeOrCount, as far as I remember Bacon advocates moving from push to pull in your case using an iterable instead of streams - the relevant Bacon issue quotes Eric Meijer saying you shouldn't use observables for fast source and slow consumer - I tend to agree. I'd write an answer producing a merged stream but honestly I don't understand your interleaving logic well enough - I can take a short if you clarify that.Marindamarinduque
I also don't understand your desired interleaving. In the first example, I presume you are using concatMap between me and mm. In that case, yes, s2 takes advantage. But you can instead use flatMap to allow concurrency. I believe this in Bacon is flatMapWithConcurrencyLimit(). Another solution is to flatMap promises on s1 and flatMap promises on s2 separately, without merging, since merging has a "or" semantics anyway. Depends on what you are trying to achieve.Aquacade
The first example is an undesired behaviour. I made it explicit in the question.Scholastic
I understood that in the first place. So doesn't my comment above solve the problem?Aquacade
@OlegGrenrus I still don't understand your fairness criteria - you want a merge that allows at most Y events from s1 for every X events from s2 and vice versa?Marindamarinduque
S
2

The core challenge here was to understand, how to formalise fairness. In the question I already mentioned worker analogy. Turned out that the obvious fairness criteria is to pick a stream that generated less events than others, or taken even further: whom generated streams waited for less time.

After that it was quite trivial to formalise the desired output using denotational semantics: code is on GitHub

I didn't had time to develop the denotational combinators to include withStateMachine from Bacon.js, so the next step was to reimplement it in JavaScript with Bacon.js directly. The whole runnable solution is available as a gist.

The idea is to make a state machine with

  • per stream costs and queues as a state
  • streams and additional feedback stream as inputs

As output of the whole system is feeded back, we can dequeue the next event when the previous flatMapped stream is ended.

For that I had to make a bit ugly rec combinator

function rec(f) {
  var bus = new Bacon.Bus();
  var result = f(bus);
  bus.plug(result);
  return result;
}

It's type is (EventStream a -> EventStream a) -> EventStream a - the type resembles other recursion combinators, e.g. fix.

It can be made with better system-wide behaviour, as Bus breaks unsubscription propagation. We have to work on that.

The Second helper function is stateMachine, which takes an array of streams and turns them into single state machine. Essentially it's .withStateMachine ∘ mergeAll ∘ zipWithIndex.

function stateMachine(inputs, initState, f) {
  var mapped = inputs.map(function (input, i) {
    return input.map(function (x) {
      return [i, x];
    })
  });
  return Bacon.mergeAll(mapped).withStateMachine(initState, function (state, p) {
    if (p.hasValue()) {
      p = p.value();
      return f(state, p[0], p[1]);
    } else {
      return [state, p];
    }
  });
}

Using this two helpers we can write a not-so-complex fair scheduler:

function fairScheduler(streams, fn) {
  var streamsCount = streams.length;
  return rec(function (res) {
    return stateMachine(append(streams, res), initialFairState(streamsCount), function (state, i, x) {
      // console.log("FAIR: " + JSON.stringify(state), i, x);

      // END event
      if (i == streamsCount && x.end) {
        var additionalCost = new Date().getTime() - x.started;

        // add cost to input stream cost center
        var updatedState = _.extend({}, state, {
          costs: updateArray(
            state.costs,
            x.idx, function (cost) { return cost + additionalCost; }),
        });

        if (state.queues.every(function (q) { return q.length === 0; })) {
          // if queues are empty, set running: false and don't emit any events
          return [_.extend({}, updatedState, { running: false }), []];
        } else {
          // otherwise pick a stream with
          // - non-empty queue
          // - minimal cost
          var minQueueIdx = _.chain(state.queues)
            .map(function (q, i) {
              return [q, i];
            })
            .filter(function (p) {
              return p[0].length !== 0;
            })
            .sortBy(function (p) {
              return state.costs[p[1]];
            })
            .value()[0][1];

          // emit an event from that stream
          return [
            _.extend({}, updatedState, {
              queues: updateArray(state.queues, minQueueIdx, function (q) { return q.slice(1); }),
              running: true,
            }),
            [new Bacon.Next({
              value: state.queues[minQueueIdx][0],
              idx: minQueueIdx,
            })],
          ];
        }
      } else if (i < streamsCount) {
        // event from input stream
        if (state.running) {
          // if worker is running, just enquee the event
          return [
            _.extend({}, state, {
              queues: updateArray(state.queues, i, function (q) { return q .concat([x]); }),
            }),
            [],
          ];
        } else {
          // if worker isn't running, start it right away
          return [
            _.extend({}, state, {
              running: true,
            }),
            [new Bacon.Next({ value: x, idx: i})],
          ]
        }
      } else {
        return [state, []];
      }

    })
    .flatMapConcat(function (x) {
      // map passed thru events,
      // and append special "end" event
      return fn(x).concat(Bacon.once({
        end: true,
        idx: x.idx,
        started: new Date().getTime(),
      }));
    });
  })
  .filter(function (x) {
    // filter out END events
    return !x.end;
  })
  .map(".value"); // and return only value field
}

Rest of the code in the gist is quite straight-forward.

Scholastic answered 2/1, 2015 at 13:59 Comment(2)
do you write this in haskell and convert to js? what does it take to understand all of this, sorry I am new, trying to learn FP, haskell and monads etc.Federation
I used Haskell as a thinking tool. I think in types, and Haskell helps me to get them right. In fact I had to console.log a lot while making JavaScript version, to ensure I don't mess the data.Scholastic
L
2

Here's a crazy chunk of code that might help.

It turns the input streams into a single stream of 'value' events, then merges them with 'send' events (and 'end' events for bookkeeping). Then, using a state machine, it builds up queues out of the 'value' events, and dispatches values on 'send' events.

Originally I wrote a roundRobinThrottle, but I've moved it to a gist.

Here is a roundRobinPromiseMap that is very similar. The code in the gist is tested, but this is not.

# roundRobinPromiseMap :: (a -> Promise b) -> [EventStream] -> EventStream
roundRobinPromiseMap = (promiser, streams) ->
    # A bus to trigger new sends based on promise fulfillment
    promiseFulfilled = new Bacon.Bus()

    # Merge the input streams into a single, keyed stream
    theStream = Bacon.mergeAll(streams.map((s, idx) ->
        s.map((val) -> {
            type: 'value'
            index: idx
            value: val
        })
    ))
    # Merge in 'end' events
    .merge(Bacon.mergeAll(streams.map((s) ->
        s.mapEnd(-> {
            type: 'end'
        })
    )))
    # Merge in 'send' events that fire when the promise is fulfilled.
    .merge(promiseFulfilled.map({ type: 'send' }))
    # Feed into a state machine that keeps queues and only creates
    # output events on 'send' input events.
    .withStateMachine(
        {
            queues: streams.map(-> [])
            toPush: 0
            ended: 0
        }
        handleState

    )
    # Feed this output to the promiser
    theStream.onValue((value) ->
        Bacon.fromPromise(promiser(value)).onValue(->
            promiseFulfilled.push()
    ))

handleState = (state, baconEvent) ->
    outEvents = []

    if baconEvent.hasValue()
        # Handle a round robin event of 'value', 'send', or 'end'
        outEvents = handleRoundRobinEvent(state, baconEvent.value())
    else
        outEvents = [baconEvent]

    [state, outEvents]

handleRoundRobinEvent = (state, rrEvent) ->
    outEvents = []

    # 'value' : push onto queue
    if rrEvent.type == 'value'
        state.queues[rrEvent.index].push(rrEvent.value)
    # 'send' : send the next value by round-robin selection
    else if rrEvent.type == 'send'
        # Here's a sentinel for empty queues
        noValue = {}
        nextValue = noValue
        triedQueues = 0

        while nextValue == noValue && triedQueues < state.queues.length
            if state.queues[state.toPush].length > 0
                nextValue = state.queues[state.toPush].shift()
            state.toPush = (state.toPush + 1) % state.queues.length
            triedQueues++
        if nextValue != noValue
            outEvents.push(new Bacon.Next(nextValue))
    # 'end': Keep track of ended streams
    else if rrEvent.type == 'end'
        state.ended++

    # End the round-robin stream if all inputs have ended
    if roundRobinEnded(state)
        outEvents.push(new Bacon.End())

    outEvents

roundRobinEnded = (state) ->
    emptyQueues = allEmpty(state.queues)
    emptyQueues && state.ended == state.queues.length

allEmpty = (arrays) ->
    for a in arrays
        return false if a.length > 0
    return true
Luci answered 1/1, 2015 at 14:25 Comment(0)
S
2

The core challenge here was to understand, how to formalise fairness. In the question I already mentioned worker analogy. Turned out that the obvious fairness criteria is to pick a stream that generated less events than others, or taken even further: whom generated streams waited for less time.

After that it was quite trivial to formalise the desired output using denotational semantics: code is on GitHub

I didn't had time to develop the denotational combinators to include withStateMachine from Bacon.js, so the next step was to reimplement it in JavaScript with Bacon.js directly. The whole runnable solution is available as a gist.

The idea is to make a state machine with

  • per stream costs and queues as a state
  • streams and additional feedback stream as inputs

As output of the whole system is feeded back, we can dequeue the next event when the previous flatMapped stream is ended.

For that I had to make a bit ugly rec combinator

function rec(f) {
  var bus = new Bacon.Bus();
  var result = f(bus);
  bus.plug(result);
  return result;
}

It's type is (EventStream a -> EventStream a) -> EventStream a - the type resembles other recursion combinators, e.g. fix.

It can be made with better system-wide behaviour, as Bus breaks unsubscription propagation. We have to work on that.

The Second helper function is stateMachine, which takes an array of streams and turns them into single state machine. Essentially it's .withStateMachine ∘ mergeAll ∘ zipWithIndex.

function stateMachine(inputs, initState, f) {
  var mapped = inputs.map(function (input, i) {
    return input.map(function (x) {
      return [i, x];
    })
  });
  return Bacon.mergeAll(mapped).withStateMachine(initState, function (state, p) {
    if (p.hasValue()) {
      p = p.value();
      return f(state, p[0], p[1]);
    } else {
      return [state, p];
    }
  });
}

Using this two helpers we can write a not-so-complex fair scheduler:

function fairScheduler(streams, fn) {
  var streamsCount = streams.length;
  return rec(function (res) {
    return stateMachine(append(streams, res), initialFairState(streamsCount), function (state, i, x) {
      // console.log("FAIR: " + JSON.stringify(state), i, x);

      // END event
      if (i == streamsCount && x.end) {
        var additionalCost = new Date().getTime() - x.started;

        // add cost to input stream cost center
        var updatedState = _.extend({}, state, {
          costs: updateArray(
            state.costs,
            x.idx, function (cost) { return cost + additionalCost; }),
        });

        if (state.queues.every(function (q) { return q.length === 0; })) {
          // if queues are empty, set running: false and don't emit any events
          return [_.extend({}, updatedState, { running: false }), []];
        } else {
          // otherwise pick a stream with
          // - non-empty queue
          // - minimal cost
          var minQueueIdx = _.chain(state.queues)
            .map(function (q, i) {
              return [q, i];
            })
            .filter(function (p) {
              return p[0].length !== 0;
            })
            .sortBy(function (p) {
              return state.costs[p[1]];
            })
            .value()[0][1];

          // emit an event from that stream
          return [
            _.extend({}, updatedState, {
              queues: updateArray(state.queues, minQueueIdx, function (q) { return q.slice(1); }),
              running: true,
            }),
            [new Bacon.Next({
              value: state.queues[minQueueIdx][0],
              idx: minQueueIdx,
            })],
          ];
        }
      } else if (i < streamsCount) {
        // event from input stream
        if (state.running) {
          // if worker is running, just enquee the event
          return [
            _.extend({}, state, {
              queues: updateArray(state.queues, i, function (q) { return q .concat([x]); }),
            }),
            [],
          ];
        } else {
          // if worker isn't running, start it right away
          return [
            _.extend({}, state, {
              running: true,
            }),
            [new Bacon.Next({ value: x, idx: i})],
          ]
        }
      } else {
        return [state, []];
      }

    })
    .flatMapConcat(function (x) {
      // map passed thru events,
      // and append special "end" event
      return fn(x).concat(Bacon.once({
        end: true,
        idx: x.idx,
        started: new Date().getTime(),
      }));
    });
  })
  .filter(function (x) {
    // filter out END events
    return !x.end;
  })
  .map(".value"); // and return only value field
}

Rest of the code in the gist is quite straight-forward.

Scholastic answered 2/1, 2015 at 13:59 Comment(2)
do you write this in haskell and convert to js? what does it take to understand all of this, sorry I am new, trying to learn FP, haskell and monads etc.Federation
I used Haskell as a thinking tool. I think in types, and Haskell helps me to get them right. In fact I had to console.log a lot while making JavaScript version, to ensure I don't mess the data.Scholastic

© 2022 - 2024 — McMap. All rights reserved.