Spark off heap memory leak on Yarn with Kafka direct stream
Asked Answered
C

3

17

I am running spark streaming 1.4.0 on Yarn (Apache distribution 2.6.0) with java 1.8.0_45 and also Kafka direct stream. I am also using spark with scala 2.11 support.

The issue I am seeing is that both driver and executor containers are gradually increasing the physical memory usage till a point where yarn container kill it. I have configured upto 192M Heap and 384 off heap space in my driver but it eventually runs out of it

The Heap memory appears to be fine with regular GC cycles. There is no OutOffMemory encountered ever in any such runs

Infact I am not generating any traffic on the kafka queues still this happens. Here is the code I am using

object SimpleSparkStreaming extends App {

val conf = new SparkConf()
val ssc = new StreamingContext(conf,Seconds(conf.getLong("spark.batch.window.size",1L)));
ssc.checkpoint("checkpoint")
val topics = Set(conf.get("spark.kafka.topic.name")); 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> conf.get("spark.kafka.broker.list"))
            val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topics)
            kafkaStream.foreachRDD(rdd => {
                rdd.foreach(x => {
                    println(x._2)
                })

            })
    kafkaStream.print()
            ssc.start() 

            ssc.awaitTermination()

}

I am running this on CentOS 7. The command used for spark submit is following

./bin/spark-submit --class com.rasa.cloud.prototype.spark.SimpleSparkStreaming \
--conf spark.yarn.executor.memoryOverhead=256 \
--conf spark.yarn.driver.memoryOverhead=384 \
--conf spark.kafka.topic.name=test \
--conf spark.kafka.broker.list=172.31.45.218:9092 \
--conf spark.batch.window.size=1 \
--conf spark.app.name="Simple Spark Kafka application" \
--master yarn-cluster \
--num-executors 1 \
--driver-memory 192m \
--executor-memory 128m \
--executor-cores 1 \
/home/centos/spark-poc/target/lib/spark-streaming-prototype-0.0.1-SNAPSHOT.jar 

Any help is greatly appreciated

Regards,

Apoorva

Coparcenary answered 13/7, 2015 at 18:1 Comment(4)
I'm encounter the same kind of problem did you found a solution?Downatheel
I am having a similar issue, but not ye reached saturation point: #35693711Grits
do let me know if you find some solutionGrits
I am finding myself in the same situation, by any chance you found the cause or a workaround?Hinge
V
1

Try increasing executor cores. In your example the only core is dedicated for consuming the streaming data, leaving no cores to process in the incoming data.

Virulent answered 17/3, 2016 at 8:36 Comment(1)
this is DirectStream, one executor core is ok spark.apache.org/docs/latest/…Gratian
R
0

It could be a memory leak... Have you try with conf.set("spark.executor.extraJavaOptions","-XX:+UseG1GC") ?

Richelieu answered 20/7, 2016 at 8:40 Comment(0)
T
0

This is not a Kafka answer this will be isolated to Spark and how its cataloguing system is poor when it comes to consistent persistence and large operations. If you are consistently writing to a perisitence layer (i.e. in a loop re-persisting a DF after a large operation then running again) or running a large query (i.e. inputDF.distinct.count); the Spark job will begin placing some data into memory and not efficiently removing the objects that are stale.

This means overtime an object that was able to quickly run once, will steadily slow down until no memory remains available. For everyone at home spin up a AWS EMR with a large DataFrame loaded int the environment run the below query:

var iterator = 1
val endState = 15
var currentCount = 0
while (iterator <= endState) {
  currentCount = inputDF.distinct.count
  print("The number of unique records are : " + currentCount)
  iterator = iterator + 1
}

While the job is running watch the Spark UIs memory management, if the DF is sufficiently large enough for the session, you will start to notice a drop in run-time with each subsequent run, mainly blocks are becoming stale but Spark is unable to identify when to clean those blocks.

The best way I have found a solution to this problem was by writing my DF locally, clearing the persisitence layer and loading the data back in. It is a "sledge-hammer" approach to the problem, but for my business case it was the easily solution to implement that caused a 90% increase in run-time for our large tables (taking 540 minutes to around 40 with less memory).

The code I currently use is:

val interimDF = inputDF.action
val tempDF = interimDF.write.format(...).option("...","...").save("...")
spark.catalog.clearCache
val interimDF = spark.read..format(...).option("...","...").save("...").persist
interimDF.count

Here are a derivative if you dont unpersist DFs in child sub-processes:

val interimDF = inputDF.action
val tempDF = interimDF.write.format(...).option("...","...").save("...")
for ((k,v) <- sc.getPersistentRDDs) {
  v.unpersist()
}
val interimDF = spark.read..format(...).option("...","...").save("...").persist
interimDF.count
Trulatrull answered 12/8, 2019 at 17:47 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.