Why does merging with empty fs2.Stream change program's behavior
Asked Answered
C

2

5

It's well documented that merging with an empty fs2.Stream should produce the same fs2.Stream. Here is the quote from Scaladocs:

Has the property that merge(Stream.empty, s) == s

Consider the following complete Scala program with fs2.Stream:

Emitting elements

import scala.concurrent.duration._
import cats.effect.{ContextShift, IO, Timer}
import cats.syntax.flatMap._
import cats.effect.concurrent.Ref

import scala.concurrent.ExecutionContext

object TestFs2 extends App {
  implicit val timerIo: Timer[IO] = IO.timer(ExecutionContext.global)
  implicit val concurrentIo: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

  val program = Ref.of[IO, Int](0).map(ref => {
    fs2.Stream.repeatEval(ref.get).evalMap(value => {
      IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
    })
  })

  program.flatMap(_.compile.drain).unsafeRunSync()
}

The program prints the following:

Got value 0
Got value 1
Got value 2
...

and it looks ok. Now applying the quote from Scaladoc above I concluded that replacing

fs2.Stream.repeatEval(ref.get)

with

fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO, Int])

the behavior should be the same. Here is the updated program:

Emitting elements and merging with empty fs2.Stream

import scala.concurrent.duration._
import cats.effect.{ContextShift, IO, Timer}
import cats.syntax.flatMap._
import cats.effect.concurrent.Ref

import scala.concurrent.ExecutionContext

object TestFs2 extends App {
  implicit val timerIo: Timer[IO] = IO.timer(ExecutionContext.global)
  implicit val concurrentIo: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

  val program = Ref.of[IO, Int](0).map(ref => {
    fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO, Int]).evalMap(value => {
      IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
    })
  })

  program.flatMap(_.compile.drain).unsafeRunSync()
}

The program output is

Got value 0
Got value 0
Got value 1
Got value 1
Got value 2
Got value 2
Got value 3
Got value 3
...

QUESTION: Why is merging with empty fs2.Stream changes the behavior of the program resulting in duplicating elements of the original fs2.Stream?

Counterclockwise answered 13/8, 2020 at 19:52 Comment(3)
I guess this has something to do with ref rather than with merge or the empty stream. It is true that the returned Stream is the same one as before. However, for some reason (that I do not understand yet) the concurrent semantics of get changed. - In any case get + set as two different operations is not guaranteed to provide deterministic results as you have seen, you should be using getAndUpdate which does.Lithea
@LuisMiguelMejíaSuárez I'm not sure that Ref is the reason. In the original program I used repeatEval on extracting data from the database with doobie and got duplicated extraction. The bevahiour is not reproduced with MVar though... (upd: it's clear enough since MVar provides mutual exclusion access)Counterclockwise
Again the problem is probably due concurrent semantics, not really merge. But I agree it is surprising enough to be frustrating. I would recommend you to ask in the gitter channel you would get quicker help there and you can come back and answer your own question here for future readers.Lithea
S
5

The documentation of merge also says:

The implementation always tries to pull one chunk from each side before waiting for it to be consumed by resulting stream. As such, there may be up to two chunks (one from each stream) waiting to be processed while the resulting stream is processing elements.

If I understand this correctly that would mean that while the resulting stream is busy processing value 0, a new value is already pulled from the source before ref has been updated.

Strictly speaking I don't think this behavior violates any invariants. But for you it makes a difference because

  • your stream mutates the source from which it is pulling
  • your source stream is always ready to emit an element

To solve the second point you could use a 1-element queue instead of a Ref.

AFAICT the same issue could occur without using merge. The stream is free to pull as many elements from the source as it sees fit before processing them, as long as the source can emit them. You basically got lucky in your first piece of code because you have a pretty simple stream with 1-element chunks.

Selfstarter answered 14/8, 2020 at 9:13 Comment(0)
C
1

It turned out to be a bug.

mpilquist described the reason behind the behavior in the comment as

It pulls the next chunk from the source stream and then acquires the semaphore permit, which is blocked until previous chunk is processed from the queue. Hence, it's always reading 1 chunk ahead.

Following mpilquist's advices I created a pull request fixing the issue that was just merged.

Counterclockwise answered 15/8, 2020 at 20:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.