SparkR collect method crashes with OutOfMemory on Java heap space
Asked Answered
M

1

3

With SparkR, I'm trying for a PoC to collect an RDD that I created from text files which contains around 4M lines.

My Spark cluster is running in Google Cloud, is bdutil deployed and is composed with 1 master and 2 workers with 15gb of RAM and 4 cores each. My HDFS repository is based on Google Storage with gcs-connector 1.4.0. SparkR is intalled on each machine, and basic tests are working on small files.

Here is the script I use :

Sys.setenv("SPARK_MEM" = "1g")
sc <- sparkR.init("spark://xxxx:7077", sparkEnvir=list(spark.executor.memory="1g"))
lines <- textFile(sc, "gs://xxxx/dir/")
test <- collect(lines)

First time I run this, it seems to be working fine, all the tasks are run successfully, spark's ui says that the job completed, but I never get the R prompt back :

15/06/04 13:36:59 WARN SparkConf: Setting 'spark.executor.extraClassPath' to ':/home/hadoop/hadoop-install/lib/gcs-connector-1.4.0-hadoop1.jar' as a work-around.
15/06/04 13:36:59 WARN SparkConf: Setting 'spark.driver.extraClassPath' to ':/home/hadoop/hadoop-install/lib/gcs-connector-1.4.0-hadoop1.jar' as a work-around.
15/06/04 13:36:59 INFO Slf4jLogger: Slf4jLogger started
15/06/04 13:37:00 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/04 13:37:00 INFO AbstractConnector: Started [email protected]:52439
15/06/04 13:37:00 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/04 13:37:00 INFO AbstractConnector: Started [email protected]:4040

15/06/04 13:37:54 INFO GoogleHadoopFileSystemBase: GHFS version: 1.4.0-hadoop1
15/06/04 13:37:55 WARN LoadSnappy: Snappy native library is available
15/06/04 13:37:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/06/04 13:37:55 WARN LoadSnappy: Snappy native library not loaded
15/06/04 13:37:55 INFO FileInputFormat: Total input paths to process : 68
[Stage 0:=======================================================>                                                                                     (27 + 10) / 68]

Then after a CTRL-C to get the R prompt back, I try to run the collect method again, here is the result :

[Stage 1:==========================================================>                                                                                   (28 + 9) / 68]15/06/04 13:42:08 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
        at org.spark_project.protobuf.ByteString.toByteArray(ByteString.java:515)
        at akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:64)
        at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
        at scala.util.Try$.apply(Try.scala:161)
        at akka.serialization.Serialization.deserialize(Serialization.scala:98)
        at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
        at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58)
        at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58)
        at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76)
        at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I understand the exception message, but I don't understand why I am getting this the second time. Also, why the collect never returns after completing in Spark?

I Googled every piece of information I have, but I had no luck finding a solution. Any help or hint would be greatly appreciated!

Thanks

Mcnew answered 4/6, 2015 at 13:45 Comment(4)
I have not idea about spark script, But spark context must be close to come back prompt.Enriqueenriqueta
Thanks for your answer. This is the interactive mode, so this is normal that I don't close the context here. It's like using spark-shell.Mcnew
How large is your 4M-line file?Gneiss
are you caching your Data (RDD)??Kendrakendrah
G
2

This does appear to be a simple combination of Java in-memory object representations being inefficient combined with some apparent long-lived object references which cause some collections to fail to be garbage-collected in time for the new collect() call to overwrite the old one in-place.

I experimented with some options, and for my sample 256MB file that contains ~4M lines, I indeed reproduce your behavior where collect is fine the first time, but OOMs the second time, when using SPARK_MEM=1g. I then set SPARK_MEM=4g instead, and then I'm able to ctrl+c and re-run test <- collect(lines) as many times as I want.

For one thing, even if references didn't leak, note that after the first time you ran test <- collect(lines), the variable test is holding that gigantic array of lines, and the second time you call it, the collect(lines) executes before finally being assigned to the test variable and thus in any straightforward instruction-ordering, there's no way to garbage-collect the old contents of test. This means the second run will make the SparkRBackend process hold two copies of the entire collection at the same time, leading to the OOM you saw.

To diagnose, on the master I started SparkR and first ran

dhuo@dhuo-sparkr-m:~$ jps | grep SparkRBackend
8709 SparkRBackend

I also checked top and it was using around 22MB of memory. I fetched a heap profile with jmap:

jmap -heap:format=b 8709
mv heap.bin heap0.bin

Then I ran the first round of test <- collect(lines) at which point running top showed it using ~1.7g of RES memory. I grabbed another heap dump. Finally, I also tried test <- {} to get rid of references to allow garbage-collection. After doing this, and printing out test and showing it to be empty, I grabbed another heap dump and noticed RES still showed 1.7g. I used jhat heap0.bin to analyze the original heap dump, and got:

Heap Histogram

All Classes (excluding platform)

Class   Instance Count  Total Size
class [B    25126   14174163
class [C    19183   1576884
class [<other>  11841   1067424
class [Lscala.concurrent.forkjoin.ForkJoinTask; 16  1048832
class [I    1524    769384
...

After running collect, I had:

Heap Histogram

All Classes (excluding platform)

Class   Instance Count  Total Size
class [C    2784858 579458804
class [B    27768   70519801
class java.lang.String  2782732 44523712
class [Ljava.lang.Object;   2567    22380840
class [I    1538    8460152
class [Lscala.concurrent.forkjoin.ForkJoinTask; 27  1769904

Even after I nulled out test, it remained about the same. This shows us 2784858 instances of char[], for a total size of 579MB, and also 2782732 instances of String, presumably holding those char[]'s above it. I followed the reference graph all the way up, and got something like

char[] -> String -> String[] -> ... -> class scala.collection.mutable.DefaultEntry -> class [Lscala.collection.mutable.HashEntry; -> class scala.collection.mutable.HashMap -> class edu.berkeley.cs.amplab.sparkr.JVMObjectTracker$ -> java.util.Vector@0x785b48cd8 (36 bytes) -> sun.misc.Launcher$AppClassLoader@0x7855c31a8 (138 bytes)

And then AppClassLoader had something like thousands of inbound references. So somewhere along that chain something should've been removing their reference but failing to do so, causing the entire collected array to sit in memory while we try to fetch a second copy of it.

Finally, to answer your question about hanging after the collect, it appears it has to do with the data not fitting in the R process's memory; here's a thread related to that issue: https://www.mail-archive.com/[email protected]/msg29155.html

I confirmed that using a smaller file with only a handful of lines, and then running collect indeed does not hang.

Gneiss answered 6/6, 2015 at 1:37 Comment(5)
Hi Dennis, once again, thanks for your help. I'm gonna look into it and I'll get back to you ASAP!Mcnew
I'm not really familiar to the java memory analysis tools, I should have dug this myself. Thanks for doing it! So what you're saying is that there is a bug that prevent the garbage collection at some point, even if we explicitly set the variable to nothing? (just to be sure I understood) Thanks!Mcnew
It's subtle, so it may be a grey area as to whether it'd be considered an actionable bug; I did verify that once the new collect() has completed, the original collect() results do end up being garbage collected, so as long as there's no OOM on that second call, the memory doesn't appear to actually continue to leak any further.Gneiss
The practical answer here though is that you need at least enough memory to store 2x the collect() data if you're reassigning into the same variable, since the right-hand-side always needs to be evaluated before being assigned to the left-hand-side, since the right-hand-side might include the left-hand-side variable as a subexpression. In general, for strings, if your RDD is 256MB of plaintext, then the Java Strings are minimum 2x that size since "char" is 2 bytes vs typical 1-byte-char in plaintext, plus ~10% for String wrapper, and then general overhead. To be safe, I'd go with 3x space.Gneiss
And to clarify on the String overhead, it's more like 40 bytes per String. If you have tiny strings, the String overhead could be like 100%-1000%. At 100 characters it'd be closer to 10% overhead. Really long strings have less overhead as a percentage. EDIT: Underestimated Java String in memory; it's more like 40 bytes than 16 bytes.Gneiss

© 2022 - 2024 — McMap. All rights reserved.