How to use IO with Scalaz7 Iteratees without overflowing the stack?
Asked Answered
D

1

11

Consider this code (taken from here and modified to use bytes rather than lines of characters).

import java.io.{ File, InputStream, BufferedInputStream, FileInputStream }
import scalaz._, Scalaz._, effect._, iteratee.{ Iteratee => I, _ }
import std.list._

object IterateeIOExample {
  type ErrorOr[+A] = EitherT[IO, Throwable, A]

  def openStream(f: File) = IO(new BufferedInputStream(new FileInputStream(f)))
  def readByte(s: InputStream) = IO(Some(s.read()).filter(_ != -1))
  def closeStream(s: InputStream) = IO(s.close())

  def tryIO[A, B](action: IO[B]) = I.iterateeT[A, ErrorOr, B] {
    EitherT(action.catchLeft).map(r => I.sdone(r, I.emptyInput))
  }

  def enumBuffered(r: => BufferedInputStream) = new EnumeratorT[Int, ErrorOr] {
    lazy val reader = r
    def apply[A] = (s: StepT[Int, ErrorOr, A]) => s.mapCont(k =>
      tryIO(readByte(reader)) flatMap {
        case None => s.pointI
        case Some(byte) => k(I.elInput(byte)) >>== apply[A]
      })
  }

  def enumFile(f: File) = new EnumeratorT[Int, ErrorOr] {
    def apply[A] = (s: StepT[Int, ErrorOr, A]) =>
      tryIO(openStream(f)).flatMap(stream => I.iterateeT[Int, ErrorOr, A](
        EitherT(
          enumBuffered(stream).apply(s).value.run.ensuring(closeStream(stream)))))
  }

  def main(args: Array[String]) {
    val action = (
      I.consume[Int, ErrorOr, List] &=
      enumFile(new File(args(0)))).run.run
    println(action.unsafePerformIO())
  }
}

Running this code on a decent-sized file (8kb) produces a StackOverflowException. Some searching turned up that the exception could be avoided by using the Trampoline monad instead of IO, but that doesn't seem like a great solution - sacrifice functional purity to get the program to complete at all. The obvious way to fix this is to use IO or Trampoline as a Monad Transformer to wrap the other, but I can't find an implementation of the transformer version of either of them and I'm not enough of a functional-programming guru to know how to write my own (learning more about FP is one of the purposes of this project, but I suspect creating new monad transformers is a bit above my level at the moment). I suppose I could just wrap a big IO action around creating, running and returning the result of my iteratees, but that feels like more of a workaround than a solution.

Presumably some monads can't be converted to monad transformers, so I'd like to know if it's possible to work with large files without dropping IO or overflowing the stack, and if so, how?

Bonus question: I can't think of any way for an iteratee to signal that it's encountered an error while processing except to have it return Either, which makes it less easy to compose them. The code above shows how to use EitherT to handle errors in the enumerator, but how does that work for the iteratees?

Decide answered 22/4, 2013 at 1:26 Comment(5)
This might be useful to you: termsandtruthconditions.herokuapp.com/blog/2013/03/16/…Kierakieran
It's a good explanation of why I need to use Trampoline to avoid overflowing the stack, but it doesn't cover how to use both IO and Trampoline.Decide
IO is trampolined already.Whorish
Is it I.consume that overflows? That overflows: (I.consume[Int, Id, List] &= EnumeratorT.enumStream(Stream.fill(10000)(1))).runDiscourage
The files I'm working on are rarely bigger than about 40kb, so 8kb is reasonably decent, yeah. Unfortunately I think I need to process bytes one at a time, or at best in very small groups.Decide
D
3

After creating exceptions and printing their stack length in various place of your code, I felt that your code is not overflowing. All seems to run in constant stack size. So I looked for other places. Eventually I copied the implementation of consume and added some stack depth printing and confirmed it overflowed there.

So this overflows:

(I.consume[Int, Id, List] &= EnumeratorT.enumStream(Stream.fill(10000)(1))).run

But, I then found out that this doesn't:

(I.putStrTo[Int](System.out) &= EnumeratorT.enumStream(Stream.fill(10000)(1)))
  .run.unsafePerformIO()

putStrTo uses foldM and somehow is not causing an overflow. So I am wondering whether consume can be implemented in terms of foldM. I just copied a few things over from consume and tweaked until it compiled:

def consume1[E, F[_]:Monad, A[_]:PlusEmpty:Applicative]: IterateeT[E, F, A[E]] = {
  I.foldM[E, F, A[E]](PlusEmpty[A].empty){ (acc: A[E], e: E) =>
    (Applicative[A].point(e) <+> acc).point[F]
  }
}

And it worked! Printing a long list of ints.

Discourage answered 25/4, 2013 at 13:52 Comment(3)
It seems consume1 overflows with Scalaz 7.0.3, at least for me. Do you get the same result if you increase the stream size? I'm trying to track down a potentially related bug -- I noticed that I get a stack overflow if I run in an Id context, while I get a heap space error if I run in a Trampoline. With your case, however, the error goes away in a trampolined context, which leads me to suspect that the issues may not be related after all...Hatchment
@AaronNovstrup, it still works with 100000 and scalaz 7.0.3, so may be your issue is indeed different.Discourage
Strange. I'm seeing a stack overflow with consume1 in the Scala console even for a relatively small number of elements (100), using Scala 2.10.2, Scalaz 7.0.3, OpenJDK 64-bit server VM 1.7.0_25, and a stack size of 256k.Hatchment

© 2022 - 2024 — McMap. All rights reserved.