It's well documented that merging with an empty fs2.Stream
should produce the same fs2.Stream
. Here is the quote from Scaladocs:
Has the property that
merge(Stream.empty, s) == s
Consider the following complete Scala program with fs2.Stream
:
Emitting elements
import scala.concurrent.duration._
import cats.effect.{ContextShift, IO, Timer}
import cats.syntax.flatMap._
import cats.effect.concurrent.Ref
import scala.concurrent.ExecutionContext
object TestFs2 extends App {
implicit val timerIo: Timer[IO] = IO.timer(ExecutionContext.global)
implicit val concurrentIo: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val program = Ref.of[IO, Int](0).map(ref => {
fs2.Stream.repeatEval(ref.get).evalMap(value => {
IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
})
})
program.flatMap(_.compile.drain).unsafeRunSync()
}
The program prints the following:
Got value 0
Got value 1
Got value 2
...
and it looks ok. Now applying the quote from Scaladoc
above I concluded that replacing
fs2.Stream.repeatEval(ref.get)
with
fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO, Int])
the behavior should be the same. Here is the updated program:
Emitting elements and merging with empty fs2.Stream
import scala.concurrent.duration._
import cats.effect.{ContextShift, IO, Timer}
import cats.syntax.flatMap._
import cats.effect.concurrent.Ref
import scala.concurrent.ExecutionContext
object TestFs2 extends App {
implicit val timerIo: Timer[IO] = IO.timer(ExecutionContext.global)
implicit val concurrentIo: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val program = Ref.of[IO, Int](0).map(ref => {
fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO, Int]).evalMap(value => {
IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
})
})
program.flatMap(_.compile.drain).unsafeRunSync()
}
The program output is
Got value 0
Got value 0
Got value 1
Got value 1
Got value 2
Got value 2
Got value 3
Got value 3
...
QUESTION: Why is merging with empty fs2.Stream
changes the behavior of the program resulting in duplicating elements of the original fs2.Stream
?
ref
rather than withmerge
or the empty stream. It is true that the returned Stream is the same one as before. However, for some reason (that I do not understand yet) the concurrent semantics ofget
changed. - In any caseget
+set
as two different operations is not guaranteed to provide deterministic results as you have seen, you should be usinggetAndUpdate
which does. – LitheaRef
is the reason. In the original program I usedrepeatEval
on extracting data from the database with doobie and got duplicated extraction. The bevahiour is not reproduced withMVar
though... (upd: it's clear enough sinceMVar
provides mutual exclusion access) – Counterclockwisemerge
. But I agree it is surprising enough to be frustrating. I would recommend you to ask in the gitter channel you would get quicker help there and you can come back and answer your own question here for future readers. – Lithea