Logging and ignoring exception from Task in scalaz-streams
Asked Answered
H

1

6

Let's take an example from some scalaz-stream docs, but with a theoretical twist.

import scalaz.stream._
import scalaz.concurrent.Task

val converter: Task[Unit] =
  io.linesR("testdata/fahrenheit.txt")
    .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
    .map(line => fahrenheitToCelsius(line.toDouble).toString)
    .intersperse("\n")
    .pipe(text.utf8Encode)
    .to(io.fileChunkW("testdata/celsius.txt"))
    .run

// at the end of the universe...
val u: Unit = converter.run

In this case the file might very well contain some non-double string, and the fahrenheitToCelsius will throw some NumberFormatException. Let's say that in this case we want to maybe log this error and ignore it for further stream processing. What's the idiomatic way of doing it? I've seen some examples, but they usually collectFrom the stream.

Houlihan answered 18/12, 2014 at 15:17 Comment(1)
Probably not so idiomatic for Scalaz, but you can use Try for mapping and in case it failed, log the error the way you want (probably this way: github.com/scalaz/scalaz-stream/blob/master/src/test/scala/…).Flow
F
3

You can do it with scalaz.\/ and additional processing steps

  def fahrenheitToCelsius(line: String): Throwable \/ String = 
     \/.fromTryCatchNonFatal {
        val fahrenheit = line.toDouble
        val celsius = fahrenheit // replace with real convert
        celsius.toString
     }

  def collectSome[T]: PartialFunction[Option[T], T] = {
    case Some(v) => v
  }

  def logThrowable[T]: PartialFunction[Throwable \/ T, Option[T]] = {
    case -\/(err) => 
      err.printStackTrace()
      None
    case \/-(v) => Some(v)
  }

  val converter: Task[Unit] =
    io.linesR("testdata/fahrenheit.txt")
      .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
      .map(fahrenheitToCelsius)
      .map(logThrowable)
      .collect(collectSome)
      .intersperse("\n")
      .pipe(text.utf8Encode)
      .to(io.fileChunkW("testdata/celsius.txt"))
      .run
Fowlkes answered 18/12, 2014 at 15:45 Comment(3)
Ok, so there's no run/attemptRun executor or a generic handler that can be specified to act on certain Exceptions? Also, I think the .map(collectCome) should be a .collect(collectSome) in this case.Houlihan
by default process will fail on first exception, but a I understand you want to log them all, so you need to catch exceptions on each step that can fail. yeah, it's a typo, is should be collect(collectSome)Fowlkes
Yes, very handy for IO reads, for which scalaz-streams pretty much are made for, especially if you don't have any control over the data read and the quality of the data. It's a stream - in some cases you just want to throw away the corrupt stuff. But the answer is indeed covering.Houlihan

© 2022 - 2024 — McMap. All rights reserved.