How to share a ZIO Queue between ZIO Tasks via the ZIO Environment
Asked Answered
S

1

5

I'm somewhat new to Scala and ZIO and have run into something of an odd puzzle.

I would like to setup a ZIO Environment containing a ZIO Queue and later have different ZIO Tasks offer and take from this shared Queue.

I tried defining my environment like this

    trait MainEnv extends Console with Clock
    {
      val mainQueue = Queue.unbounded[String]
    }

and accessing the queue from separate tasks like this

    for {
      env      <- ZIO.environment[MainEnv]
      queue    <- env.mainQueue
      ...

but in my test I observe each of my separate tasks is given a separate Queue instance.
Looking at the signature for unbounded

  def unbounded[A]: UIO[Queue[A]]

I observe it doesn't immediately return a Queue but rather returns an effect which returns a queue so while the observed behavior makes sense it isn't at all what I was hoping for and I don't see a clear way to get the behavior I would like.

Would appreciate any suggestions as to how to achieve my goal of setting up different tasks communicating via a shared queue stored in the Environment.


For reference here is my code and output.

sample execution

bash-3.2$ sbt run
[info] Loading project definition from /private/tmp/env-q/project
[info] Loading settings for project root from build.sbt ...
[info] Set current project to example (in build file:/private/tmp/env-q/)
[info] Compiling 1 Scala source to /private/tmp/env-q/target/scala-2.12/classes ...
[info] Done compiling.
[info] Packaging /private/tmp/env-q/target/scala-2.12/example_2.12-0.0.1-SNAPSHOT.jar ...
[info] Done packaging.
[info] Running example.Main 
env example.Main$$anon$1@36fbcafd queue zio.Queue$$anon$1@65b9a444
env example.Main$$anon$1@36fbcafd queue zio.Queue$$anon$1@7c050764

(hangs here - notice env object is the same but queue objects are different so second task is stuck)

/tmp/env-q/test.scala

Here is my complete test which is based on example from slide 37 of https://www.slideshare.net/jdegoes/zio-queue

    package example
    import zio.{App, Queue, ZIO}
    import zio.blocking.Blocking
    import zio.clock.Clock
    import zio.console._

    trait MainEnv extends Console with Clock    // environment with queue
    {
      val mainQueue = Queue.unbounded[String]
    }

    object Main extends App                     // main test
    {
      val task1 = for {                         // task to add something to the queue
        env      <- ZIO.environment[MainEnv]
        queue    <- env.mainQueue
        _        <- putStrLn(s"env $env queue $queue")
        _        <- queue.offer("Give me Coffee!")
      } yield ()

      val task2 = for {                         // task to remove+print stuff from queue
        env      <- ZIO.environment[MainEnv]
        queue    <- env.mainQueue
        _        <- putStrLn(s"env $env queue $queue")
        _        <- queue.take.flatMap(putStrLn)
      } yield ()

      val program = ZIO.runtime[MainEnv]        // top level to run both tasks
        .flatMap {
          implicit rts =>
            for {
              _ <- task1.fork
              _ <- task2
            } yield ()
        }

      val runEnv = new MainEnv with Console.Live with Clock.Live

      def run(args: List[String]) =
        program.provide(runEnv).fold(_ => 1, _ => 0)
    }

/tmp/env-q/build.sbt

Here is the build.sbt I used

val ZioVersion = "1.0.0-RC13"

lazy val root = (project in file("."))
  .settings(
    organization := "example",
    name := "example",
    version := "0.0.1-SNAPSHOT",
    scalaVersion := "2.12.8",
    scalacOptions ++= Seq("-Ypartial-unification"),
    libraryDependencies ++= Seq(
      "dev.zio"                 %% "zio"                 % ZioVersion,
    ),
    addCompilerPlugin("org.spire-math" %% "kind-projector"     % "0.9.6"),
    addCompilerPlugin("com.olegpy"     %% "better-monadic-for" % "0.2.4")
  )

scalacOptions ++= Seq(
  "-deprecation",               // Emit warning and location for usages of deprecated APIs.
  "-encoding", "UTF-8",         // Specify character encoding used by source files.
  "-language:higherKinds",      // Allow higher-kinded types
  "-language:postfixOps",       // Allows operator syntax in postfix position (deprecated since Scala 2.10)
  "-feature",                   // Emit warning and location for usages of features that should be imported explicitly.
  "-Ypartial-unification",      // Enable partial unification in type constructor inference
  "-Xfatal-warnings",           // Fail the compilation if there are any warnings
)
Seismic answered 1/10, 2019 at 4:50 Comment(0)
S
6

In the Official Gitter Channel for ZIO Core, Adam Fraser suggested

You would want to have you environment just have a Queue[String] and then you would want to use a method like provideM with Queue.unbounded to create one queue and provide it to your whole application. That's where provideM as opposed to provide comes in. It let's you satisfy an environment that requires an A by providing a ZIO[A].

A little digging into the ZIO source revealed a helpful example in DefaultTestReporterSpec.scala.

By defining the Environment as

  trait MainEnv extends Console with Clock    // environment with queue
  {
    val mainQueue: Queue[String]
  }

changing my tasks to access env.mainQueue with = instead of <- (because mainQueue is a Queue[String] now and not a UIO[Queue[String]], removing runEnv and changing the run method in my test to use provideSomeM

  def run(args: List[String]) =
    program.provideSomeM(
      for {
        q <- Queue.unbounded[String]
      } yield new MainEnv with Console.Live with Clock.Live {
        override val mainQueue = q
      }
    ).fold(_ => 1, _ => 0)

I was able to get the intended result:

sbt:example> run
[info] Running example.Main 
env example.Main$$anon$1@45bfc0da queue zio.Queue$$anon$1@13b73d56
env example.Main$$anon$1@45bfc0da queue zio.Queue$$anon$1@13b73d56
Give me Coffee!
[success] Total time: 1 s, completed Oct 1, 2019 7:41:47 AM
Seismic answered 1/10, 2019 at 14:45 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.