Spark java.lang.OutOfMemoryError: Java heap space
Asked Answered
H

14

292

My cluster: 1 master, 11 slaves, each node has 6 GB memory.

My settings:

spark.executor.memory=4g, Dspark.akka.frameSize=512

Here is the problem:

First, I read some data (2.19 GB) from HDFS to RDD:

val imageBundleRDD = sc.newAPIHadoopFile(...)

Second, do something on this RDD:

val res = imageBundleRDD.map(data => {
                               val desPoints = threeDReconstruction(data._2, bg)
                                 (data._1, desPoints)
                             })

Last, output to HDFS:

res.saveAsNewAPIHadoopFile(...)

When I run my program it shows:

.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL)
Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
java.lang.OutOfMemoryError: Java heap space

There are too many tasks?

PS: Every thing is ok when the input data is about 225 MB.

How can I solve this problem?

Hospice answered 15/1, 2014 at 13:30 Comment(5)
how do run spark? is it from console? or which deploy scripts do you use?Peafowl
I use sbt to compile and run my app. sbt package then sbt run. I implemented the same program on hadoop a month ago , and I met the same problem of OutOfMemoryError, but in hadoop it can be easily solved by increasing the value of mapred.child.java.opts from Xmx200m to Xmx400m. Does spark have any jvm setting for it's tasks?I wonder if spark.executor.memory is the same meaning like mapred.child.java.opts in hadoop. In my program spark.executor.memory has already been setted to 4g much bigger than Xmx400m in hadoop. Thank you~Hospice
Are the three steps you mention the only ones you do? What's the size of the dataa generated by (data._1, desPoints) - this should fit in memory esp if this data is then shuffled to another stageRevell
What is the memory configuration for the driver? Check which server get the out of memory error. Is it the driver or one of the executors.Lobell
See here all configurations properties: spark.apache.org/docs/2.1.0/configuration.htmlKabyle
S
456

I have a few suggestions:

  • If your nodes are configured to have 6g maximum for Spark (and are leaving a little for other processes), then use 6g rather than 4g, spark.executor.memory=6g. Make sure you're using as much memory as possible by checking the UI (it will say how much mem you're using)
  • Try using more partitions, you should have 2 - 4 per CPU. IME increasing the number of partitions is often the easiest way to make a program more stable (and often faster). For huge amounts of data you may need way more than 4 per CPU, I've had to use 8000 partitions in some cases!
  • Decrease the fraction of memory reserved for caching, using spark.storage.memoryFraction. If you don't use cache() or persist in your code, this might as well be 0. It's default is 0.6, which means you only get 0.4 * 4g memory for your heap. IME reducing the mem frac often makes OOMs go away. UPDATE: From spark 1.6 apparently we will no longer need to play with these values, spark will determine them automatically.
  • Similar to above but shuffle memory fraction. If your job doesn't need much shuffle memory then set it to a lower value (this might cause your shuffles to spill to disk which can have catastrophic impact on speed). Sometimes when it's a shuffle operation that's OOMing you need to do the opposite i.e. set it to something large, like 0.8, or make sure you allow your shuffles to spill to disk (it's the default since 1.0.0).
  • Watch out for memory leaks, these are often caused by accidentally closing over objects you don't need in your lambdas. The way to diagnose is to look out for the "task serialized as XXX bytes" in the logs, if XXX is larger than a few k or more than an MB, you may have a memory leak. See https://mcmap.net/q/102159/-task-not-serializable-java-io-notserializableexception-when-calling-function-outside-closure-only-on-classes-not-objects
  • Related to above; use broadcast variables if you really do need large objects.
  • If you are caching large RDDs and can sacrifice some access time consider serialising the RDD http://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage. Or even caching them on disk (which sometimes isn't that bad if using SSDs).
  • (Advanced) Related to above, avoid String and heavily nested structures (like Map and nested case classes). If possible try to only use primitive types and index all non-primitives especially if you expect a lot of duplicates. Choose WrappedArray over nested structures whenever possible. Or even roll out your own serialisation - YOU will have the most information regarding how to efficiently back your data into bytes, USE IT!
  • (bit hacky) Again when caching, consider using a Dataset to cache your structure as it will use more efficient serialisation. This should be regarded as a hack when compared to the previous bullet point. Building your domain knowledge into your algo/serialisation can minimise memory/cache-space by 100x or 1000x, whereas all a Dataset will likely give is 2x - 5x in memory and 10x compressed (parquet) on disk.

http://spark.apache.org/docs/1.2.1/configuration.html

EDIT: (So I can google myself easier) The following is also indicative of this problem:

java.lang.OutOfMemoryError : GC overhead limit exceeded
Speculation answered 30/3, 2014 at 10:43 Comment(12)
Thanks for your suggestions~ If I set spark.executor.memory=6g, spark will have the problem:"check your cluster UI to ensure that workers are registered and have sufficient memory". Setting spark.storage.memoryFraction to 0.1 can't solve the problem either. Maybe the problem lies in my code.Thank you!Hospice
@hequn8128, spark executor memory must fit you spark worker memoryBibliology
Useful Link: spark.apache.org/docs/latest/tuning.html#serialized-rdd-storageRentschler
To your first point, @samthebest, you should not use ALL the memory for spark.executor.memory because you definitely need some amount of memory for I/O overhead. If you use all of it, it will slow down your program. The exception to this might be Unix, in which case you have swap space.Daman
@Hunle Yes one needs to allow for other processes on the node. I'll update my answer.Speculation
In Spark 2.3.0 spark.storage.memoryFraction & spark.shuffle.memoryFraction are deprecated & used only in legacyMode. Confusingly enough, the alternative to spark.storage.memoryFraction is spark.memory.storageFraction (!) while shuffle now has it's own dedicated set of configurations & needs to be enabled per executor basisMonandry
@Monandry Yes since 1.6 it's supposed to automagically set these. I actually don't like this as it can be a little harder to know when an OOM is due to a shuffle vs cache vs heap. Sometimes the ST is enough. Does spark have any way it reports how much memory is reserved for each?Speculation
Additional suggestion for anyone getting this OOM when using pandas UDFs: ensure that pyarrow is enabled (it is not enabled by default!). spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") (More info here).Cornfield
Would you comment on the specific characteristics of WrappedArray that make it helpful here? Unfortunately, the scala api docs don't say anything about it's performance characteristics (a problem with those docs in general when most of the collections are designed around specific performance use cases). Thanks!Irena
My best guess is that WrappedArray guarantees a fixed number of elements but has methods like a Seq. Is that the key to the performance gains, the predictable size of the structures?Irena
FYI, WrappedArray is deprecated in scala 2.13 in favor ArraySeq which should have similar properties.Irena
WrappedArray is more well behaved that Array, like equality and toString works, but is similarly as efficient as Array in memory usage. Also I vaguely remember Array and spark not playing nicely together back in the day. TBH, it's been like 10 years since I answered this question, so I can't recall exactly the motivations :( @IrenaSpeculation
G
79

To add a use case to this that is often not discussed, I will pose a solution when submitting a Spark application via spark-submit in local mode.

According to the gitbook Mastering Apache Spark by Jacek Laskowski:

You can run Spark in local mode. In this non-distributed single-JVM deployment mode, Spark spawns all the execution components - driver, executor, backend, and master - in the same JVM. This is the only mode where a driver is used for execution.

Thus, if you are experiencing OOM errors with the heap, it suffices to adjust the driver-memory rather than the executor-memory.

Here is an example:

spark-1.6.1/bin/spark-submit
  --class "MyClass"
  --driver-memory 12g
  --master local[*] 
  target/scala-2.10/simple-project_2.10-1.0.jar 
Garv answered 12/3, 2016 at 18:56 Comment(3)
How much percentage we should be considering for driver memory in stand-alone mode.Lowland
@Brian, In local mode, does the driver memory need to be larger than the input data size? Is it possible to specify number of partitions for input dataset, so the Spark job can deal with dataset much larger than the available RAM?Disquietude
Driver memory can't be larger than the input size. Consider you have a 160gb file to be loaded into your cluster. so, for that, you would create a driver with 161 GB? that's not feasible. Its how you determine the number of executors, their memory, and the buffer for overhead memory and their OS. You need to calculate all these things by seeing the yarn UI and the cluster memory given to you. For better performance, you also need to consider the executor-cores which should be always between 3-5 @DisquietudeGarik
G
45

You should configure offHeap memory settings as shown below:

val spark = SparkSession
     .builder()
     .master("local[*]")
     .config("spark.executor.memory", "70g")
     .config("spark.driver.memory", "50g")
     .config("spark.memory.offHeap.enabled",true)
     .config("spark.memory.offHeap.size","16g")   
     .appName("sampleCodeForReference")
     .getOrCreate()

Give the driver memory and executor memory as per your machines RAM availability. You can increase the offHeap size if you are still facing the OutofMemory issue.

Galvanotropism answered 11/6, 2018 at 14:50 Comment(3)
setting the driver memory in your code will not work, read spark documentation for this: Spark properties mainly can be divided into two kinds: one is related to deploy, like “spark.driver.memory”, “spark.executor.instances”, this kind of properties may not be affected when setting programmatically through SparkConf in runtime, or the behavior is depending on which cluster manager and deploy mode you choose, so it would be suggested to set through configuration file or spark-submit command line options.Bunce
I just added the configurations using spark-submit command to fix the heap size issue. Thanks.Bickel
Note .config("spark.memory.offHeap.enabled",true) should be changed to .config("spark.memory.offHeap.enabled","true") for pyspark users.Offstage
F
19

You should increase the driver memory. In your $SPARK_HOME/conf folder you should find the file spark-defaults.conf, edit and set the spark.driver.memory 4000m depending on the memory on your master, I think. This is what fixed the issue for me and everything runs smoothly

Foolproof answered 3/9, 2015 at 21:15 Comment(0)
P
15

Have a look at the start up scripts a Java heap size is set there, it looks like you're not setting this before running Spark worker.

# Set SPARK_MEM if it isn't already set since we also use it for this process
SPARK_MEM=${SPARK_MEM:-512m}
export SPARK_MEM

# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"

You can find the documentation to deploy scripts here.

Peafowl answered 16/1, 2014 at 9:3 Comment(3)
Thank you~ I will try later. From spark ui, it shows the memory of every executor is 4096. So the setting has been enabled, right?Hospice
Saw your answer while I'm facing similar issue (#34762932). Looking the link you provided looks like setting Xms/Xmx is not there anymore, can you tell why?Subjectivism
The content at the script linked to by start up scripts has changed unfortunately. No such options exist as of 2019-12-19Adal
B
14

I suffered from this issue a lot when using dynamic resource allocation. I had thought it would utilize my cluster resources to best fit the application.

But the truth is the dynamic resource allocation doesn't set the driver memory and keeps it to its default value, which is 1G.

I resolved this issue by setting spark.driver.memory to a number that suits my driver's memory (for 32GB ram I set it to 18G).

You can set it using spark submit command as follows:

spark-submit --conf spark.driver.memory=18g

Very important note, this property will not be taken into consideration if you set it from code, according to Spark Documentation - Dynamically Loading Spark Properties:

Spark properties mainly can be divided into two kinds: one is related to deploy, like “spark.driver.memory”, “spark.executor.instances”, this kind of properties may not be affected when setting programmatically through SparkConf in runtime, or the behavior is depending on which cluster manager and deploy mode you choose, so it would be suggested to set through configuration file or spark-submit command line options; another is mainly related to Spark runtime control, like “spark.task.maxFailures”, this kind of properties can be set in either way.

Bunce answered 27/12, 2018 at 9:9 Comment(1)
You should use --conf spark.driver.memory=18gHardtop
W
7

Broadly speaking, spark Executor JVM memory can be divided into two parts. Spark memory and User memory. This is controlled by property spark.memory.fraction - the value is between 0 and 1. When working with images or doing memory intensive processing in spark applications, consider decreasing the spark.memory.fraction. This will make more memory available to your application work. Spark can spill, so it will still work with less memory share.

The second part of the problem is division of work. If possible, partition your data into smaller chunks. Smaller data possibly needs less memory. But if that is not possible, you are sacrifice compute for memory. Typically a single executor will be running multiple cores. Total memory of executors must be enough to handle memory requirements of all concurrent tasks. If increasing executor memory is not a option, you can decrease the cores per executor so that each task gets more memory to work with. Test with 1 core executors which have largest possible memory you can give and then keep increasing cores until you find the best core count.

Wardieu answered 3/7, 2018 at 6:52 Comment(0)
G
5

The location to set the memory heap size (at least in spark-1.0.0) is in conf/spark-env. The relevant variables are SPARK_EXECUTOR_MEMORY & SPARK_DRIVER_MEMORY. More docs are in the deployment guide

Also, don't forget to copy the configuration file to all the slave nodes.

Gregor answered 6/8, 2014 at 11:33 Comment(0)
N
5

Did you dump your master gc log? So I met similar issue and I found SPARK_DRIVER_MEMORY only set the Xmx heap. The initial heap size remains 1G and the heap size never scale up to the Xmx heap.

Passing "--conf "spark.driver.extraJavaOptions=-Xms20g" resolves my issue.

ps aux | grep java and the you'll see the follow log:=

24501 30.7 1.7 41782944 2318184 pts/0 Sl+ 18:49 0:33 /usr/java/latest/bin/java -cp /opt/spark/conf/:/opt/spark/jars/* -Xmx30g -Xms20g

Neutrino answered 8/8, 2019 at 20:15 Comment(0)
C
3

I have few suggession for the above mentioned error.

● Check executor memory assigned as an executor might have to deal with partitions requiring more memory than what is assigned.

● Try to see if more shuffles are live as shuffles are expensive operations since they involve disk I/O, data serialization, and network I/O

● Use Broadcast Joins

● Avoid using groupByKeys and try to replace with ReduceByKey

● Avoid using huge Java Objects wherever shuffling happens

Canopus answered 13/2, 2019 at 10:39 Comment(0)
Q
2

From my understanding of the code provided above, it loads the file and does map operation and saves it back. There is no operation that requires shuffle. Also, there is no operation that requires data to be brought to the driver hence tuning anything related to shuffle or driver may have no impact. The driver does have issues when there are too many tasks but this was only till spark 2.0.2 version. There can be two things which are going wrong.

  • There are only one or a few executors. Increase the number of executors so that they can be allocated to different slaves. If you are using yarn need to change num-executors config or if you are using spark standalone then need to tune num cores per executor and spark max cores conf. In standalone num executors = max cores / cores per executor .
  • The number of partitions are very few or maybe only one. So if this is low even if we have multi-cores,multi executors it will not be of much help as parallelization is dependent on the number of partitions. So increase the partitions by doing imageBundleRDD.repartition(11)
Quadrennial answered 22/10, 2019 at 5:13 Comment(0)
C
2

Simple if you are using a script or juyter notebook then set only config path when you start build a spark session...

spark = SparkSession.builder.master('local[*]').config("spark.driver.memory", "15g").appName('testing').getOrCreate()
Cateyed answered 27/5, 2022 at 7:0 Comment(1)
Worked like magic for me!Entreat
J
1

heap space errors generally occur due to either bringing too much data back to the driver or the executor. In your code it does not seem like you are bringing anything back to the driver, but instead you maybe overloading the executors that are mapping an input record/row to another using the threeDReconstruction() method. I am not sure what is in the method definition but that is definitely causing this overloading of the executor. Now you have 2 options,

  1. edit your code to do the 3-D reconstruction in a more efficient manner.
  2. do no edit code, but give more memory to your executors, as well as give more memory-overhead. [spark.executor.memory or spark.driver.memoryOverhead]

I would advise being careful with the increase and use only as much as you need. Each job is unique in terms of its memory requirements, so I would advise empirically trying different values increasing every time by a power of 2 (256M,512M,1G .. and so on)

You will arrive at a value for the executor memory that will work. Try re-running the job with this value 3 or 5 times before settling for this configuration.

Jocelyn answered 1/12, 2020 at 1:43 Comment(0)
M
-1

Setting these exact configurations helped resolving the issue.

spark-submit --conf spark.yarn.maxAppAttempts=2 --executor-memory 10g --num-executors 50 --driver-memory 12g
Moonshine answered 22/11, 2019 at 3:52 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.