I referred to the testing code you provided on Spark-44900:
import spark.implicits._
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(10))
// create a pseudo stream
val rddQueue = new mutable.Queue[RDD[Long]]()
val stream = ssc.queueStream(rddQueue, oneAtATime = true)
// create a simple lookup table
val lookup: DataFrame = sc.range(start = 0, end = 50000000, numSlices = 10)
.toDF("id")
.withColumn("value", md5(rand().cast(StringType)))
.cache()
// for every micro-batch perform value lookup via join
stream.foreachRDD { rdd =>
val df = rdd.toDF("id")
df.join(lookup, Seq("id"), "leftouter").count()
}
// run the streaming
ssc.start()
for (_ <- 1 to 1000000) {
rddQueue.synchronized {
val firstId = Random.nextInt(50000000)
rddQueue += sc.range(start = firstId, end = firstId + 10000, numSlices = 4)
}
Thread.sleep(10)
}
ssc.stop()
}
}
And used the parameters you provided:
spark-submit --class Spark44900 --master yarn --deploy-mode client --executor-cores 2 --num-executors 5 --executor-memory 1250m --driver-memory 1g --conf spark.dynamicAllocation.enabled=false --conf spark.sql.shuffle.partitions=10
Here is the information I obtained from the tests:
Before refresh:
https://i.sstatic.net/eomTg.png
After refresh:
https://i.sstatic.net/ihCZW.png
Comparing it with the information provided by the UI, it is indeed evident that the data volume is increasing.
Here is the information I retrieved from the logs:
23/12/03 21:10:14 DEBUG storage.BlockManager: Getting local block rdd_1260_1
23/12/03 21:10:14 DEBUG storage.BlockManager: Block rdd_1260_1 was not found
23/12/03 21:10:14 DEBUG storage.BlockManager: Getting remote block rdd_1260_1
23/12/03 21:10:14 DEBUG storage.BlockManager: Block rdd_1260_1 not found
23/12/03 21:10:38 INFO memory.MemoryStore: Block rdd_1260_1 stored as values in memory (estimated size 176.7 MB, free 310.2 MB)
23/12/03 21:10:38 DEBUG storage.BlockManagerMaster: Updated info of block rdd_1260_1
23/12/03 21:10:38 DEBUG columnar.LongColumnBuilder: Compressor for [id]: org.apache.spark.sql.execution.columnar.compression.LongDelta$Encoder@38e09408, ratio: 0.1251
23/12/03 21:10:38 DEBUG storage.BlockManager: Told master about block rdd_1260_1
23/12/03 21:10:38 DEBUG columnar.StringColumnBuilder: Compressor for [value]: org.apache.spark.sql.execution.columnar.compression.PassThrough$Encoder@5d47e4a2, ratio: 1.0
23/12/03 21:10:38 DEBUG storage.BlockManager: Put block rdd_1260_1 locally took 24420 ms
23/12/03 21:10:38 DEBUG storage.BlockManager: Putting block rdd_1260_1 without replication took 24423 ms
23/12/03 21:10:38 DEBUG storage.BlockManager: Getting local block rdd_1260_1
23/12/03 21:10:38 DEBUG storage.BlockManager: Level for block rdd_1260_1 is StorageLevel(disk, memory, deserialized, 1 replicas)
23/12/03 21:11:49 DEBUG storage.BlockManager: Getting local block rdd_1260_1
23/12/03 21:11:49 DEBUG storage.BlockManager: Level for block rdd_1260_1 is StorageLevel(disk, memory, deserialized, 1 replicas)
23/12/03 21:11:49 INFO storage.BlockManager: Found block rdd_1260_1 locally
23/12/03 21:11:52 INFO storage.BlockManager: Dropping block rdd_1260_1 from memory
23/12/03 21:11:52 DEBUG memory.MemoryStore: Block rdd_1260_1 of size 185234096 dropped from memory (free 207706349)
23/12/03 21:11:52 DEBUG storage.BlockManagerMaster: Updated info of block rdd_1260_1
23/12/03 21:11:52 DEBUG storage.BlockManager: Told master about block rdd_1260_1
23/12/03 21:11:57 DEBUG storage.BlockManager: Getting local block rdd_1260_1
23/12/03 21:11:57 DEBUG storage.BlockManager: Level for block rdd_1260_1 is StorageLevel(disk, memory, deserialized, 1 replicas)
23/12/03 21:11:58 INFO memory.MemoryStore: Block rdd_1260_1 stored as values in memory (estimated size 176.7 MB, free 133.6 MB)
23/12/03 21:11:58 INFO storage.BlockManager: Found block rdd_1260_1 locally
The above-mentioned logs are specific to the tracking of rdd_1260_1. Based on its behavior, it appears that the data in this partition is being duplicated (stored and cleared) in memory. Disk caching only occurs during the initial lookup, and in subsequent lookups, it directly hits the locally cached block.
Therefore, based on this observation, I speculate that this might be an issue with the UI display. In reality, this cache is not continuously replicated.
Here is the disk usage situation in HDFS:
The HDFS cluster I used for testing contains no additional redundant data. Judging solely by the data volume, the actual usage is only 5.8 GB, which is significantly different from the UI-displayed Disk Size: 222.9 GB. Based on this comparison, I believe this could be a bug in the spark-metric module. However, I haven't delved further into the specific cause.
The above represents the findings I currently have. I hope this aligns with the issue you've been facing. If there are any misconceptions on my part, I would appreciate your clarification. Thank you.