Somehow switch to a "push" based model by maintaining my own internal buffer so I don't pull twice?
Yes. E.g., you can use a queue from fs2:
def splitStream[F[_], A](stream: Stream[F, A], split: A => B): F[(Stream[F, A], Stream[F, B])] =
for {
q <- Queue.noneTerminated[F, A]
} yield (stream.evalTap(a => q.enqueue1(Some(a)).onFinalize(q.enqueue1(None)),
Of course, here the problem is that if a caller ignores either stream, the other one will deadlock and never emit anything. This is generally the issue you run into when trying to make a stream into several ones, and have a value guaranteed to appear in each substream irrespective of when it's subscribed to.
The solution I usually go for is to combine larger actions and use operators like broadcast
or parJoin
def splitAndRun[F[_]: Concurrent, A](
base: Stream[F, A],
runSeveralThings: List[Stream[F, A] => Stream[F, Unit]]
): F[Unit] =
base.broadcastTo(run: _*).compile.drain
Here, you know how many consumers you are going to have, so there will not be an ignored stream in the first place.
and send half tostream
and the other half
? This would solve not needing a intermediary buffer? – Ney