Splitting a scalaz-stream process into two child streams
Asked Answered
T

3

8

Using scalaz-stream is it possible to split/fork and then rejoin a stream?

As an example, let's say I have the following function

val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)

val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add)
val sumOfOddNumbers  = streamOfNumbers.filter(isOdd).fold(0)(add)

zip( sumOfEven, sumOfOdd ).to( someEffectfulFunction )

With scalaz-stream, in this example the results would be as you expect - a tuple of numbers from 1 to 10 passed to a sink.

However if we replace streamOfNumbers with something that requires IO, it will actually execute the IO action twice.

Using a Topic I'm able create a pub/sub process that duplicates elements in the stream correctly, however it does not buffer - it simply consumers the entire source as fast as possible regardless of the pace sinks consume it.

I can wrap this in a bounded Queue, however the end result feels a lot more complex than it needs to be.

Is there a simpler way of splitting a stream in scalaz-stream without duplicate IO actions from the source?

Triangulate answered 17/12, 2014 at 9:35 Comment(0)
R
6

Also to clarify the previous answer delas with the "splitting" requirement. The solution to your specific issue may be without the need of splitting streams:

val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)
val oddOrEven: Process[Task,Int\/Int] = streamOfNumbers.map {
   case even if even % 2 == 0 => right(even)
   case odd => left(odd)
} 
val summed = oddOrEven.pipeW(sump1).pipeO(sump1)

val evenSink: Sink[Task,Int] = ???
val oddSink: Sink[Task,Int] = ???

summed
.drainW(evenSink)
.to(oddSink)
Roderick answered 18/12, 2014 at 8:25 Comment(3)
I see a generic combinator in your answer: def partition[A](p: A => Boolean): Process1[A, A \/ A] = process1.lift(a => if (p(a)) right(a) else left(a)). Should it be added?Oscoumbrian
Frank I think we missing kind of more effective partialTee for these use cases. But yeah your proposal seems fine to me. I would like to see something like observeIf[A,B](pf:PartialFunction[A,B])(Sink[T,B]) perhaps.Roderick
Thanks. My real scenario is a fair bit more complex so couldn't be simplified this way, but this gives me a few more threads to pluck tat that might solve the issue.Triangulate
R
2

You can perhaps still use topic and just assure that the children processes will subscribe before you will push to topic.

However please note this solution does not have any bounds on it, i.e. if you will be pushing too fast, you may encounter OOM error.

def split[A](source:Process[Task,A]): Process[Task,(Process[Task,A], Proces[Task,A])]] = {
  val topic = async.topic[A]

  val sub1 = topic.subscribe
  val sub2 = topic.subscribe

  merge.mergeN(Process(emit(sub1->sub2),(source to topic.publish).drain))
}
Roderick answered 18/12, 2014 at 8:16 Comment(1)
For a dynamic number of splits, would a return type Process[Task, Array[Process[Task,A]]] make any sense? Also, how might I modify this to use Queue rather than Topic and give unique stream elements to each child process? Thanks for your timeColeen
G
0

I likewise needed this functionality. My situation was quite a bit trickier disallowing me to work around it in this manner.

Thanks to Daniel Spiewak's response in this thread, I was able to get the following to work. I improved on his solution by adding onHalt so my application would exit once the Process completed.

def split[A](p: Process[Task, A], limit: Int = 10): Process[Task, (Process[Task, A], Process[Task, A])] = {
  val left = async.boundedQueue[A](limit)
  val right = async.boundedQueue[A](limit)

  val enqueue = p.observe(left.enqueue).observe(right.enqueue).drain.onHalt { cause =>
    Process.await(Task.gatherUnordered(Seq(left.close, right.close))){ _ => Halt(cause) }
  }
  val dequeue = Process((left.dequeue, right.dequeue))

  enqueue merge dequeue
}
Gaberones answered 2/9, 2016 at 21:53 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.