I'm somewhat new to Scala and ZIO and have run into something of an odd puzzle.
I would like to setup a ZIO Environment containing a ZIO Queue and later
have different ZIO Tasks offer
and take
from this shared Queue.
I tried defining my environment like this
trait MainEnv extends Console with Clock
{
val mainQueue = Queue.unbounded[String]
}
and accessing the queue from separate tasks like this
for {
env <- ZIO.environment[MainEnv]
queue <- env.mainQueue
...
but in my test I observe each of my separate tasks is given a separate Queue instance.
Looking at the signature for unbounded
def unbounded[A]: UIO[Queue[A]]
I observe it doesn't immediately return a Queue but rather returns an effect which returns a queue so while the observed behavior makes sense it isn't at all what I was hoping for and I don't see a clear way to get the behavior I would like.
Would appreciate any suggestions as to how to achieve my goal of setting up different tasks communicating via a shared queue stored in the Environment.
For reference here is my code and output.
sample execution
bash-3.2$ sbt run
[info] Loading project definition from /private/tmp/env-q/project
[info] Loading settings for project root from build.sbt ...
[info] Set current project to example (in build file:/private/tmp/env-q/)
[info] Compiling 1 Scala source to /private/tmp/env-q/target/scala-2.12/classes ...
[info] Done compiling.
[info] Packaging /private/tmp/env-q/target/scala-2.12/example_2.12-0.0.1-SNAPSHOT.jar ...
[info] Done packaging.
[info] Running example.Main
env example.Main$$anon$1@36fbcafd queue zio.Queue$$anon$1@65b9a444
env example.Main$$anon$1@36fbcafd queue zio.Queue$$anon$1@7c050764
(hangs here - notice env object is the same but queue objects are different so second task is stuck)
/tmp/env-q/test.scala
Here is my complete test which is based on example from slide 37 of https://www.slideshare.net/jdegoes/zio-queue
package example
import zio.{App, Queue, ZIO}
import zio.blocking.Blocking
import zio.clock.Clock
import zio.console._
trait MainEnv extends Console with Clock // environment with queue
{
val mainQueue = Queue.unbounded[String]
}
object Main extends App // main test
{
val task1 = for { // task to add something to the queue
env <- ZIO.environment[MainEnv]
queue <- env.mainQueue
_ <- putStrLn(s"env $env queue $queue")
_ <- queue.offer("Give me Coffee!")
} yield ()
val task2 = for { // task to remove+print stuff from queue
env <- ZIO.environment[MainEnv]
queue <- env.mainQueue
_ <- putStrLn(s"env $env queue $queue")
_ <- queue.take.flatMap(putStrLn)
} yield ()
val program = ZIO.runtime[MainEnv] // top level to run both tasks
.flatMap {
implicit rts =>
for {
_ <- task1.fork
_ <- task2
} yield ()
}
val runEnv = new MainEnv with Console.Live with Clock.Live
def run(args: List[String]) =
program.provide(runEnv).fold(_ => 1, _ => 0)
}
/tmp/env-q/build.sbt
Here is the build.sbt I used
val ZioVersion = "1.0.0-RC13"
lazy val root = (project in file("."))
.settings(
organization := "example",
name := "example",
version := "0.0.1-SNAPSHOT",
scalaVersion := "2.12.8",
scalacOptions ++= Seq("-Ypartial-unification"),
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % ZioVersion,
),
addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.9.6"),
addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.2.4")
)
scalacOptions ++= Seq(
"-deprecation", // Emit warning and location for usages of deprecated APIs.
"-encoding", "UTF-8", // Specify character encoding used by source files.
"-language:higherKinds", // Allow higher-kinded types
"-language:postfixOps", // Allows operator syntax in postfix position (deprecated since Scala 2.10)
"-feature", // Emit warning and location for usages of features that should be imported explicitly.
"-Ypartial-unification", // Enable partial unification in type constructor inference
"-Xfatal-warnings", // Fail the compilation if there are any warnings
)