How do I "split" a stream in fs2?
Asked Answered
N

1

9

I want to do something like this:

def splitStream[F, A](stream: fs2.Stream[F, A], split: A => B): (Stream[F, A], Stream[F, B)) = 
  (stream, stream.map(split)

But this does not work as it "pulls" from the source twice - once each when I drain both stream and stream.map(split). How do I prevent this? Somehow switch to a "push" based model by maintaining my own internal buffer so I don't pull twice?

Ney answered 9/12, 2019 at 18:36 Comment(0)
L
8

Somehow switch to a "push" based model by maintaining my own internal buffer so I don't pull twice?

Yes. E.g., you can use a queue from fs2:

def splitStream[F[_], A](stream: Stream[F, A], split: A => B): F[(Stream[F, A], Stream[F, B])] = 
  for {
    q <- Queue.noneTerminated[F, A]
  } yield (stream.evalTap(a => q.enqueue1(Some(a)).onFinalize(q.enqueue1(None)), q.dequeue.map(split))

Of course, here the problem is that if a caller ignores either stream, the other one will deadlock and never emit anything. This is generally the issue you run into when trying to make a stream into several ones, and have a value guaranteed to appear in each substream irrespective of when it's subscribed to.

The solution I usually go for is to combine larger actions and use operators like broadcast or parJoin:

def splitAndRun[F[_]: Concurrent, A](
  base: Stream[F, A],
  runSeveralThings: List[Stream[F, A] => Stream[F, Unit]]
): F[Unit] =
  base.broadcastTo(run: _*).compile.drain

Here, you know how many consumers you are going to have, so there will not be an ignored stream in the first place.

Louisiana answered 10/12, 2019 at 11:31 Comment(2)
Can we then somehow "duplicate" each A in stream and send half to stream and the other half to steam.map(split)? This would solve not needing a intermediary buffer?Ney
@Ney the mental model is different - you can indeed "duplicate" things (stream.map(a => (a, a)) is the dumbest example, but broadcast "duplicates" too), but you can not send things into a stream.Louisiana

© 2022 - 2024 — McMap. All rights reserved.