How to read from TCP and write to stdout?
Asked Answered
P

1

10

I'm failing to get a simple scalaz-stream example running, reading from TCP and writing to std out.

val src = tcp.reads(1024)
val addr = new InetSocketAddress(12345)
val p = tcp.server(addr, concurrentRequests = 1) {
  src ++ tcp.lift(io.stdOutLines)
}
p.run.run

It just sits there, not printing anything.

I've also tried various arrangements using to, always with the tcp.lift incantation to get a Process[Connection, A], including

tcp.server(addr, concurrentRequests = 1)(src) map (_ to tcp.lift(io.stdOutLines))

which doesn't even compile.

Do I need to wye the source and print streams together? An example I found on the original pull request for tcp replacing nio seemed to indicate this, but wye no longer appears to exist on Process, so confusion reigns unfortunately.


Edit it turns out that in addition to the type problems explained by Paul, you also need to run the inner processes "manually", for example by doing p.map(_.run.run).run.run. I don't suppose that's the idiomatic way to do this, but it does work.

Porter answered 15/12, 2014 at 19:42 Comment(0)
S
5

You need to pass src through the sink to actually write anything. I think this should do it:

import scalaz.stream.{io,tcp,text}
import scalaz.stream.tcp.syntax._

val p = tcp.server(addr, concurrentRequests = 1) { 
  tcp.reads(1024).pipe(text.utf8Decode) through tcp.lift(io.stdOutLines) 
}
p.run.run

The expression src ++ tcp.lift(io.stdOutLines) should really be a type error. The type of tcp.reads(1024) is Process[Connection,ByteVector], and the type of tcp.lift(io.stdOutLines) is Process[Connection, String => Task[Unit]]. Appending those two processes does not make sense, and the only reason it typechecks is due to the covariance of Process[+F[_],+O]. Scala is "helpfully" inferring Any when you append two processes with unrelated output types.

A future release of scalaz-stream may add a constraint on ++ and other functions that exploit covariance to make sure the least upper bound that gets computed isn't something useless like Any or Serializable. This would go a long way to preventing mistakes like this. In the meantime, make sure you understand the types of all the functions you are working with, what they do, and how you are sticking them together.

Slot answered 15/12, 2014 at 19:42 Comment(3)
Using sbt wart-remover would probably catch that Any inference.Novgorod
Do you mean through io.stdOutLines, without the tcp.lift? Still doesn't seem to be pulling anything off the stream though.Porter
Ok - with the tcp.lift removed, this does work, but note that you have to p.map(_.run.run).run.run to get the inner processes to run.Porter

© 2022 - 2024 — McMap. All rights reserved.