Using scalaz-stream is it possible to split/fork and then rejoin a stream?
As an example, let's say I have the following function
val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)
val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add)
val sumOfOddNumbers = streamOfNumbers.filter(isOdd).fold(0)(add)
zip( sumOfEven, sumOfOdd ).to( someEffectfulFunction )
With scalaz-stream, in this example the results would be as you expect - a tuple of numbers from 1 to 10 passed to a sink.
However if we replace streamOfNumbers
with something that requires IO, it will actually execute the IO action twice.
Using a Topic
I'm able create a pub/sub process that duplicates elements in the stream correctly, however it does not buffer - it simply consumers the entire source as fast as possible regardless of the pace sinks consume it.
I can wrap this in a bounded Queue, however the end result feels a lot more complex than it needs to be.
Is there a simpler way of splitting a stream in scalaz-stream without duplicate IO actions from the source?
def partition[A](p: A => Boolean): Process1[A, A \/ A] = process1.lift(a => if (p(a)) right(a) else left(a))
. Should it be added? – Oscoumbrian