Spark DataFrame cache keeps growing
Asked Answered
L

1

8

How does spark decide how many times to replicate a cached partition?

The storage level in the storage tab on the spark UI says “Disk Serialized 1x Replicated”, but it looks like partitions get replicated onto multiple executors. We have noticed this happening with DISK_ONLY storage level using spark 2.3. We are caching a dataset with 101 partitions (size on disk is 468.4 GB). Data is distributed initially on 101 executors (we have 600 executors total). As we run queries on this dataset, the size on disk grows as well as the number of executors data is distributed on. We also noticed that commonly one block/partition is replicated on multiple executors on the same node – if it is stored on disk, why is this not shared between executors on same node?

persistedDs = dataset.repartition(101).persist(StorageLevel.DISK_ONLY)
  • Initial Load

    Initial Load

  • After Running queries on Cached Dataset

    enter image description here

  • One executor can have 2 partitions cached in it. Also, note that the RDD is cached multiple time in the attached screenshot.

    enter image description here

  • Data Distribution on 101 Executors

    enter image description here

Lugansk answered 9/4, 2019 at 21:44 Comment(2)
did you find any reason?Mot
this issue is raised with spark dev as well issues.apache.org/jira/browse/SPARK-44900Mot
A
0

I referred to the testing code you provided on Spark-44900:

import spark.implicits._

val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(10))
// create a pseudo stream
val rddQueue = new mutable.Queue[RDD[Long]]()
val stream = ssc.queueStream(rddQueue, oneAtATime = true)
// create a simple lookup table
val lookup: DataFrame = sc.range(start = 0, end = 50000000, numSlices = 10)
  .toDF("id")
  .withColumn("value", md5(rand().cast(StringType)))
  .cache()
// for every micro-batch perform value lookup via join
stream.foreachRDD { rdd =>
  val df = rdd.toDF("id")
  df.join(lookup, Seq("id"), "leftouter").count()
}
// run the streaming
ssc.start()
for (_ <- 1 to 1000000) {
  rddQueue.synchronized {
    val firstId = Random.nextInt(50000000)
    rddQueue += sc.range(start = firstId, end = firstId + 10000, numSlices = 4)
  }
  Thread.sleep(10)
}
ssc.stop()

} }

And used the parameters you provided:

spark-submit --class Spark44900 --master yarn --deploy-mode client --executor-cores 2 --num-executors 5 --executor-memory 1250m --driver-memory 1g --conf spark.dynamicAllocation.enabled=false --conf spark.sql.shuffle.partitions=10 

Here is the information I obtained from the tests:

Before refresh:

https://i.sstatic.net/eomTg.png

After refresh:

https://i.sstatic.net/ihCZW.png

Comparing it with the information provided by the UI, it is indeed evident that the data volume is increasing.

Here is the information I retrieved from the logs:

23/12/03 21:10:14 DEBUG storage.BlockManager: Getting local block rdd_1260_1
23/12/03 21:10:14 DEBUG storage.BlockManager: Block rdd_1260_1 was not found
23/12/03 21:10:14 DEBUG storage.BlockManager: Getting remote block rdd_1260_1
23/12/03 21:10:14 DEBUG storage.BlockManager: Block rdd_1260_1 not found
23/12/03 21:10:38 INFO memory.MemoryStore: Block rdd_1260_1 stored as values in memory (estimated size 176.7 MB, free 310.2 MB)
23/12/03 21:10:38 DEBUG storage.BlockManagerMaster: Updated info of block rdd_1260_1
23/12/03 21:10:38 DEBUG columnar.LongColumnBuilder: Compressor for [id]: org.apache.spark.sql.execution.columnar.compression.LongDelta$Encoder@38e09408, ratio: 0.1251
23/12/03 21:10:38 DEBUG storage.BlockManager: Told master about block rdd_1260_1
23/12/03 21:10:38 DEBUG columnar.StringColumnBuilder: Compressor for [value]: org.apache.spark.sql.execution.columnar.compression.PassThrough$Encoder@5d47e4a2, ratio: 1.0
23/12/03 21:10:38 DEBUG storage.BlockManager: Put block rdd_1260_1 locally took  24420 ms
23/12/03 21:10:38 DEBUG storage.BlockManager: Putting block rdd_1260_1 without replication took  24423 ms
23/12/03 21:10:38 DEBUG storage.BlockManager: Getting local block rdd_1260_1
23/12/03 21:10:38 DEBUG storage.BlockManager: Level for block rdd_1260_1 is StorageLevel(disk, memory, deserialized, 1 replicas)
23/12/03 21:11:49 DEBUG storage.BlockManager: Getting local block rdd_1260_1
23/12/03 21:11:49 DEBUG storage.BlockManager: Level for block rdd_1260_1 is StorageLevel(disk, memory, deserialized, 1 replicas)
23/12/03 21:11:49 INFO storage.BlockManager: Found block rdd_1260_1 locally
23/12/03 21:11:52 INFO storage.BlockManager: Dropping block rdd_1260_1 from memory
23/12/03 21:11:52 DEBUG memory.MemoryStore: Block rdd_1260_1 of size 185234096 dropped from memory (free 207706349)
23/12/03 21:11:52 DEBUG storage.BlockManagerMaster: Updated info of block rdd_1260_1
23/12/03 21:11:52 DEBUG storage.BlockManager: Told master about block rdd_1260_1
23/12/03 21:11:57 DEBUG storage.BlockManager: Getting local block rdd_1260_1
23/12/03 21:11:57 DEBUG storage.BlockManager: Level for block rdd_1260_1 is StorageLevel(disk, memory, deserialized, 1 replicas)
23/12/03 21:11:58 INFO memory.MemoryStore: Block rdd_1260_1 stored as values in memory (estimated size 176.7 MB, free 133.6 MB)
23/12/03 21:11:58 INFO storage.BlockManager: Found block rdd_1260_1 locally

The above-mentioned logs are specific to the tracking of rdd_1260_1. Based on its behavior, it appears that the data in this partition is being duplicated (stored and cleared) in memory. Disk caching only occurs during the initial lookup, and in subsequent lookups, it directly hits the locally cached block.

Therefore, based on this observation, I speculate that this might be an issue with the UI display. In reality, this cache is not continuously replicated.

Here is the disk usage situation in HDFS:

The HDFS cluster I used for testing contains no additional redundant data. Judging solely by the data volume, the actual usage is only 5.8 GB, which is significantly different from the UI-displayed Disk Size: 222.9 GB. Based on this comparison, I believe this could be a bug in the spark-metric module. However, I haven't delved further into the specific cause.

The above represents the findings I currently have. I hope this aligns with the issue you've been facing. If there are any misconceptions on my part, I would appreciate your clarification. Thank you.

Adon answered 3/12, 2023 at 14:50 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.