Suppose I have two possibly infinite streams:
s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...
I want to merge the streams and then map merged stream with slowish asynchronous operation (e.g. in Bacon with fromPromise
and flatMapConcat
).
I can combine them with merge
:
me = a12b3.c45d6.7e...
And then map
s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...
me = a12b3.c45d6.7e...
mm = a..1..2..b..3..c..4..5..
As you see greedier s2
streams gets advantage in the long run. This is undesired behaviour.
The merge behaviour is not ok, as I want to have some kind of backpressure to have more interleaved, "fair", "round-robin" merge. Few examples of desired behaviour:
s1 = a.....b..............c...
s2 = ..1.2.3..................
mm = a...1...b...2...3....c...
s1 = a.........b..........c...
s2 = ..1.2.3..................
mm = a...1...2...b...3....c...
One way to think this is that s1
and s2
send tasks to the worker which can handle only one task at the time. With merge
and flatMapConcat
I'll get a greedy task manager, but I want more fair one.
I'd like to find a simple and elegant solution. Would be nice if it is easily generalisable for arbitrary amount of streams:
// roundRobinPromiseMap(streams: [Stream a], f: a -> Promise b): Stream b
var mm = roundRobinPromiseMap([s1, s2], slowAsyncFunc);
Solution using RxJS or other Rx library is fine too.
Clarifications
Not zipAsArray
I don't want:
function roundRobinPromiseMap(streams, f) {
return Bacon.zipAsArray.apply(null, streams)
.flatMap(Bacon.fromArray)
.flatMapConcat(function (x) {
return Bacon.fromPromise(f(x));
});
}
Compare the example marble diagram:
s1 = a.....b..............c.......
s2 = ..1.2.3......................
mm = a...1...b...2...3....c....... // wanted
zip = a...1...b...2........c...3... // zipAsArray based
Yes I'll run into buffering issues
... but so will I with straightforward unfair one:
function greedyPromiseMap(streams, f) {
Bacon.mergeAll(streams).flatMapConcat(function (x) {
return Bacon.fromPromise(f(x));
});
}
Marble diagram
s1 = a.........b..........c...
s2 = ..1.2.3..................
mm = a...1...2...b...3....c...
merge = a...1...2...3...b....c...
bufferWithTimeOrCount
, as far as I remember Bacon advocates moving from push to pull in your case using an iterable instead of streams - the relevant Bacon issue quotes Eric Meijer saying you shouldn't use observables for fast source and slow consumer - I tend to agree. I'd write an answer producing a merged stream but honestly I don't understand your interleaving logic well enough - I can take a short if you clarify that. – MarindamarinduqueconcatMap
betweenme
andmm
. In that case, yes,s2
takes advantage. But you can instead useflatMap
to allow concurrency. I believe this in Bacon isflatMapWithConcurrencyLimit()
. Another solution is toflatMap
promises ons1
andflatMap
promises ons2
separately, without merging, since merging has a "or" semantics anyway. Depends on what you are trying to achieve. – Aquacade