Scalaz 7 Iteratee to process large zip file (OutOfMemoryError)
Asked Answered
P

3

5

I'm trying to use the scalaz iteratee package to process a large zip file in constant space. I have a long-running process I need to perform on each file in the zip file. Those processes can (and should) be run in parallel.

I created an EnumeratorT that inflates each ZipEntry into a File object. The signature looks like:

def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO]

I want to attach an IterateeT that will perform the long-running process on each file. I basically end up with something like:

type IOE[A] = IoExceptionOr[A]

def action(f:File):IO[List[Promise[IOE[File]]]] = (
  consume[Promise[IOE[File]], IO, List] %=
  map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %=
  map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
  enumZipFile(f)
).run

def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] =
  Promise { Thread.sleep(5000); iof }

When I try to run it:

action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get

I get a java.lang.OutOfMemoryError: Java heap space message. That makes sense to me, since it's trying to build up a massive list in memory of all these IO and Promise objects.

A few questions:

  • Does anyone have any ideas on how to avoid this? It feels like I'm approaching the problem incorrectly, because I really only care about the longRunningProcess for its side-effects.
  • Is the Enumerator approach here the wrong approach?

I'm pretty much out of ideas, so anything will help.

Thanks!

Update #1

Here is the stack trace:

[error] java.lang.OutOfMemoryError: Java heap space
[error]         at scalaz.Free.flatMap(Free.scala:46)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)

I am currently taking the advice of nadavwr to ensure everything is acting like I think it is. I will report back any updates.

Update #2

Using ideas from both the answers below, I found a decent solution. As huynhjl suggested (and I verified using nadavwr's suggestion of analyzing the heap dump), consume was causing every inflated ZipEntry to be held in memory, which is why the process was running out of memory. I changed consume to foldM and updated the long-running process to just return a Promise[IOE[Unit]] instead of a reference to the file. That way I have a collection of all IoExceptions at the end. Here is the working solution:

def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
  foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %=
  map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %=
  map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
  enumZipFile(f)
).run

def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] =
  Promise { Thread.sleep(5000); iof.map(println) }

This solution inflates each entry while asynchronously uploading them. At the end, I have a huge list of fulfilled Promise objects that contain any errors. I still not fully convinced this is the correct use of an Iteratee, but I do now have several reusable, composeable pieces that I can use in other pieces of our system (this is a very common pattern for us).

Thanks for all your help!

Preconscious answered 26/4, 2013 at 3:14 Comment(4)
What does the long process do? Does it compute something from the zip content?Elegancy
Each file in the zip file is an image. The long process uploads that file to Rackspace CloudFiles. Once I figure this out, I'm going to need to add additional processes that resize the images and then upload them.Preconscious
Iteratees feels like the wrong abstraction for this job, since you want to parallelize the workload. Actors would work better I think.Elegancy
Funny you mention that, because actors are actually where I started, then read somewhere that they are a poor choice for semi-sequential batch processing. Iteratees were recommended! I agree that the more I dig into this, the more it feels like the wrong abstraction. I'm going to try and debug what I've got, as I have an idea to create an Iteratee that runs N number of Promises, blocks until it gets the responses, then asks for more input. Does that sound reasonable? Thank you!Preconscious
E
4

Don't use consume. See my other recent answer: How to use IO with Scalaz7 Iteratees without overflowing the stack?

foldM may be a better choice.

Also try to map the file to something else (like a success return code) to see if that allows the JVM to garbage collect the inflated zip entries.

Elegancy answered 26/4, 2013 at 14:57 Comment(1)
Thank you for your answer. In the end, using foldM seemed to be the key.Preconscious
B
1

How expensive (in terms of memory is your longRunningProcess? How about file deflation? Are they being executed the number of times you expect them to be? (a simple counter would be helpful)

A stack trace will be helpful to determine the straw that broke the camel's back -- sometimes that's the culprit.

If you want to be certain what's taking up so much memory, you can use the -XX:+HeapDumpOnOutOfMemoryError JVM argument and then analyze it with VisualVM, Eclipse MAT, or other heap analyzers.

Followup

It does seem strange to me that you are enumerating promises. It's counterintuitive to kick off a computation independent of both the enumerator and the iteratee. An iteratee-based solution might be better served by an enumerator that returns 'inert' elements instead of promises. Unfortunately, that would make your handling of individual files serial, but that's iteratees for ya -- non-blocking stream processing.

An actor-based solution would fit better IMHO, but both actors and iteratees (especially the latter) seem overkill for what you are trying to accomplish (at least the parts you are sharing).

Please consider plain futures/promises from Scala 2.10's scala.concurrent package, and be sure to take a look at Scala's parallel collections as well. I wouldn't introduce additional concepts into the code before these prove insufficient. Try defining a fixed-size ExecutionContext for constraining your parallelism.

Bracketing answered 26/4, 2013 at 13:33 Comment(2)
Great advice. I'm going through step-by-step to ensure everything is being executed like I'm assuming it is. I updated my question above with the stack trace. I'm going to try the heap dump next. Thanks!Preconscious
Concerning your followup: I agree w/ your concerns about using the Iteratee for this process. From what I posted, it definitely seems like overkill. However, the pattern of downloading a file (or files), streaming the contents, processing each entry, then doing something with the result is used all over the place in our app. I feel like Iteratee's have given me some nice, reusable chunks of code that I can use to build these bigger processes up. Thank you very much for your time and help!Preconscious
B
0

I started out the answer after a quick read through, and somehow had 'stack overflow' stuck in my mind instead of 'out of memory error' ... Must be the URL :-)

Still, functional computations that rely on recursions are susceptible to stack overflows, so I've left the answer in place for any body stumbling across, and promise to try to come up with a more relevant answer.

If what you got was a stack overflow, you'd be needing a 'trampoline', a construct that boosts your computation out of the stack between recursions.

See section titled "Stackless Scala with Free Monads" in Learning Scalaz Day 18, part of @eed3si9n's excellent series of posts.

See also this gist by @mpilquist, demonstrating a trampolined iteratee.

Bracketing answered 26/4, 2013 at 12:29 Comment(1)
Haha, stackoverflow.com is an unfortunate name when you're talking about long-running, functional processes.Preconscious

© 2022 - 2024 — McMap. All rights reserved.