I'm trying to understand how to reorganize a program which I would previously have written as a sequence of state transitions:
I have some business logic:
type In = Long
type Count = Int
type Out = Count
type S = Map[Int, Count]
val inputToIn: String => Option[In]
= s => try Some(s.toLong) catch { case _ : Throwable => None }
def transition(in: In): S => (S, Out)
= s => { val n = s.getOrElse(in, 0); (s + (in -> n+1), n+1) }
val ZeroOut: Out = 0
val InitialState: S = Map.empty
With these I wish to construct a program to pass in some initial State (an empty Map), read input from stdin, convert it to In
, run the state transition and output the current state S
and the output Out
to stdout.
Previously, I would have done something like this:
val runOnce = StateT[IO, S, Out](s => IO.readLn.map(inputToIn) flatMap {
case None => IO((s, ZeroOut))
case Some(in) => val (t, o) = transition(in)(s)
IO.putStrLn(t.toString) |+| IO.putStrLn(o.toString) >| IO((t, o))
})
Stream.continually(runOnce).sequenceU.eval(InitialState)
However, I'm really struggling to see how to connect this approach (a stream of state transitions) with scalaz-stream. I started with this:
type Transition = S => (S, Out)
val NoTransition: Transition = s => (s, 0)
io.stdInLines.map(inputToIn).map(_.fold(NoTransition)(transition))
This is of type: Process[Task, Transition]
. I don't really know where to go from there.
- How do I "pass in" my
InitialState
and run the program, threading in the outputS
at each step as the inputS
to the next one? - How do I get the values of
S
andOut
at each step and print them to stdout (assuming I can convert them to strings)?
In trying to use a single for-comprehension, I get similarly stuck:
for {
i <- Process.eval(Task.now(InitialState))
l <- io.stdInLines.map(inputToIn)
...
Any help is greatly appreciated!
I've got a bit further now.
type In_ = (S, Option[In])
type Out_ = (S, Out)
val input: Process[Task, In_]
= for {
i <- Process.emit(InitialState)
o <- io.stdInLines.map(inputToIn)
} yield (i, o)
val prog =
input.pipe(process1.collect[In_, Out_]) {
case (s, Some(in)) => transition(in)(s)
}).to(io.stdOutLines.contramap[Out_](_.toString))
Then
prog.run.run
It doesn't work: It seems like the state is not being threaded through the stream. Rather, at each stage, the initial state is being passed in.
Paul Chiusano suggested using the approach of process1.scan
. So now I do this:
type In_ = In
type Out_ = (S, Out)
val InitialOut_ = (InitialState, ZeroOut)
val program =
io.stdInLines.collect(Function.unlift(inputToIn)).pipe(
process1.scan[In_, Out_](InitialOut_) {
case ((s, _), in) => transition(in)(s)
}).to(io.stdOutLines.contramap[Out_](_.shows))
There's a problem here: In this specific example, my Out
type is a monoid, so my initial state can be created using its identity but this may not generally be the case. What would I do then? (I guess I could use Option
but this seems like it's unnecessary.)
StateT
construct as the carrier monad for your stream?type Carrier[A] = StateT[Task, S, A]; val input: Process[Carrier, Option[In]] = ...; prog.run.run(initialValue).run // prog.run is a Carrier[Unit] i.e. StateT
– Sayre