Somehow switch to a "push" based model by maintaining my own internal buffer so I don't pull twice?
Yes. E.g., you can use a queue from fs2:
def splitStream[F[_], A](stream: Stream[F, A], split: A => B): F[(Stream[F, A], Stream[F, B])] =
for {
q <- Queue.noneTerminated[F, A]
} yield (stream.evalTap(a => q.enqueue1(Some(a)).onFinalize(q.enqueue1(None)), q.dequeue.map(split))
Of course, here the problem is that if a caller ignores either stream, the other one will deadlock and never emit anything. This is generally the issue you run into when trying to make a stream into several ones, and have a value guaranteed to appear in each substream irrespective of when it's subscribed to.
The solution I usually go for is to combine larger actions and use operators like broadcast
or parJoin
:
def splitAndRun[F[_]: Concurrent, A](
base: Stream[F, A],
runSeveralThings: List[Stream[F, A] => Stream[F, Unit]]
): F[Unit] =
base.broadcastTo(run: _*).compile.drain
Here, you know how many consumers you are going to have, so there will not be an ignored stream in the first place.
A
instream
and send half tostream
and the other half tosteam.map(split)
? This would solve not needing a intermediary buffer? – Ney