I'm trying to use the scalaz iteratee package to process a large zip file in constant space. I have a long-running process I need to perform on each file in the zip file. Those processes can (and should) be run in parallel.
I created an EnumeratorT
that inflates each ZipEntry
into a File
object. The signature looks like:
def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO]
I want to attach an IterateeT
that will perform the long-running process on each file. I basically end up with something like:
type IOE[A] = IoExceptionOr[A]
def action(f:File):IO[List[Promise[IOE[File]]]] = (
consume[Promise[IOE[File]], IO, List] %=
map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] =
Promise { Thread.sleep(5000); iof }
When I try to run it:
action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get
I get a java.lang.OutOfMemoryError: Java heap space
message. That makes sense to me, since it's trying to build up a massive list in memory of all these IO
and Promise
objects.
A few questions:
- Does anyone have any ideas on how to avoid this? It feels like I'm approaching the problem incorrectly, because I really only care about the
longRunningProcess
for its side-effects. - Is the
Enumerator
approach here the wrong approach?
I'm pretty much out of ideas, so anything will help.
Thanks!
Update #1
Here is the stack trace:
[error] java.lang.OutOfMemoryError: Java heap space
[error] at scalaz.Free.flatMap(Free.scala:46)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
I am currently taking the advice of nadavwr to ensure everything is acting like I think it is. I will report back any updates.
Update #2
Using ideas from both the answers below, I found a decent solution. As huynhjl suggested (and I verified using nadavwr's suggestion of analyzing the heap dump), consume
was causing every inflated ZipEntry
to be held in memory, which is why the process was running out of memory. I changed consume
to foldM
and updated the long-running process to just return a Promise[IOE[Unit]]
instead of a reference to the file. That way I have a collection of all IoExceptions at the end. Here is the working solution:
def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %=
map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] =
Promise { Thread.sleep(5000); iof.map(println) }
This solution inflates each entry while asynchronously uploading them. At the end, I have a huge list of fulfilled Promise
objects that contain any errors. I still not fully convinced this is the correct use of an Iteratee, but I do now have several reusable, composeable pieces that I can use in other pieces of our system (this is a very common pattern for us).
Thanks for all your help!