What is the difference between spark checkpoint and persist to a disk. Are both these store in the local disk?
There are few important differences but the fundamental one is what happens with lineage. Persist
/ cache
keeps lineage intact while checkpoint
breaks lineage. Lets consider following examples:
import org.apache.spark.storage.StorageLevel
val rdd = sc.parallelize(1 to 10).map(x => (x % 3, 1)).reduceByKey(_ + _)
cache
/persist
:val indCache = rdd.mapValues(_ > 4) indCache.persist(StorageLevel.DISK_ONLY) indCache.toDebugString // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated] // | ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated] // +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated] // | ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated] indCache.count // 3 indCache.toDebugString // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated] // | CachedPartitions: 8; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 587.0 B // | ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated] // +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated] // | ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
checkpoint
:val indChk = rdd.mapValues(_ > 4) indChk.checkpoint indChk.toDebugString // (8) MapPartitionsRDD[11] at mapValues at <console>:24 [] // | ShuffledRDD[3] at reduceByKey at <console>:21 [] // +-(8) MapPartitionsRDD[2] at map at <console>:21 [] // | ParallelCollectionRDD[1] at parallelize at <console>:21 [] indChk.count // 3 indChk.toDebugString // (8) MapPartitionsRDD[11] at mapValues at <console>:24 [] // | ReliableCheckpointRDD[12] at count at <console>:27 []
As you can see, in the first case lineage is preserved even if data is fetched from the cache. It means that data can be recomputed from scratch if some partitions of indCache
are lost. In the second case lineage is completely lost after the checkpoint and indChk
doesn't carry an information required to rebuild it anymore.
checkpoint
, unlike cache
/ persist
is computed separately from other jobs. That's why RDD marked for checkpointing should be cached:
It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.
Finally checkpointed
data is persistent and not removed after SparkContext
is destroyed.
Regarding data storage SparkContext.setCheckpointDir
used by RDD.checkpoint
requires DFS
path if running in non-local mode. Otherwise it can be local files system as well. localCheckpoint
and persist
without replication should use local file system.
Important Note:
RDD checkpointing is a different concept than a chekpointing in Spark Streaming. The former one is designed to address lineage issue, the latter one is all about streaming reliability and failure recovery.
persist
/ cache
an RDD, and later dependent RDD-s need to be calculated, then the persisted/cached RDD content is used automatically by Spark to speed up things. But if you just checkpoint
the same RDD, it won't be utilized when calculating dependent RDD-s. I wonder when a checkpointed RDD is used by Spark. If there's a failure, will Spark use it automatically? Or am I supposed to spark.read
it manually to get the RDD to continue from? That would explain why it's never deleted by Spark. So, how are checkpoints supposed to be used? –
Chadwick I think you can find a very detailed answer here
While it is very hard to summarize all in that page, I will say
Persist
- Persisting or caching with StorageLevel.DISK_ONLY cause the generation of RDD to be computed and stored in a location such that subsequent use of that RDD will not go beyond that points in recomputing the linage.
- After persist is called, Spark still remembers the lineage of the RDD even though it doesn't call it.
- Secondly, after the application terminates, the cache is cleared or file destroyed
Checkpointing
- Checkpointing stores the rdd physically to hdfs and destroys the lineage that created it.
- The checkpoint file won't be deleted even after the Spark application terminated.
- Checkpoint files can be used in subsequent job run or driver program
- Checkpointing an RDD causes double computation because the operation will first call a cache before doing the actual job of computing and writing to the checkpoint directory.
You may want to read the article for more of the details or internals of Spark's checkpointing or Cache operations.
Persist(MEMORY_AND_DISK) will store the data frame to disk and memory temporary without breaking the lineage of the program i.e. df.rdd.toDebugString() would return the same output. It is recommended to use persist(*) on a calculation, that is going to be reused to avoid recalculation of intermediate results:
df = df.persist(StorageLevel.MEMORY_AND_DISK) calculation1(df) calculation2(df)
Note, that caching the data frame does not guarantee, that it will remain in memory until you call it next time. Depending on the memory usage the cache can be discarded.
checkpoint(), on the other hand, breaks lineage and forces data frame to be stored on disk. Unlike usage of cache()/persist(), frequent check-pointing can slow down your program. Checkpoints are recommended to use when a) working in an unstable environment to allow fast recovery from failures b) storing intermediate states of calculation when new entries of the RDD are dependent on the previous entries i.e. to avoid recalculating a long dependency chain in case of failure
If you check the relevant part of the documentation, it talks about writing data to a reliable system, e.g. HDFS. But it is up to you to tell Apache Spark where to write its checkpoint information.
On the other hand, persisting is about caching data mostly in memory, as this part of the documentation clearly indicates.
So, it depends on what directory you gave to Apache Spark.
© 2022 - 2024 — McMap. All rights reserved.