What is the difference between spark checkpoint and persist to a disk
Asked Answered
G

4

95

What is the difference between spark checkpoint and persist to a disk. Are both these store in the local disk?

Gustation answered 1/2, 2016 at 10:6 Comment(2)
It is a very generic question. Better would be to add some context around it. To answer your question it can be stored to any persistent Storage Area - Local DIsk or HDFS or NFS Mounted space etc.Lowney
@Lowney - This is a very specific question about the differences between two Spark RDD methods. The answer can be objective and focused, as zero323's answer below demonstrates.Rumsey
C
106

There are few important differences but the fundamental one is what happens with lineage. Persist / cache keeps lineage intact while checkpoint breaks lineage. Lets consider following examples:

import org.apache.spark.storage.StorageLevel

val rdd = sc.parallelize(1 to 10).map(x => (x % 3, 1)).reduceByKey(_ + _)
  • cache / persist:

    val indCache  = rdd.mapValues(_ > 4)
    indCache.persist(StorageLevel.DISK_ONLY)
    
    indCache.toDebugString
    // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated]
    //  |  ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated]
    //  +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated]
    //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
    
    indCache.count
    // 3
    
    indCache.toDebugString
    // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated]
    //  |       CachedPartitions: 8; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 587.0 B
    //  |  ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated]
    //  +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated]
    //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
    
  • checkpoint:

    val indChk  = rdd.mapValues(_ > 4)
    indChk.checkpoint
    
    indChk.toDebugString
    // (8) MapPartitionsRDD[11] at mapValues at <console>:24 []
    //  |  ShuffledRDD[3] at reduceByKey at <console>:21 []
    //  +-(8) MapPartitionsRDD[2] at map at <console>:21 []
    //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 []
    
    indChk.count
    // 3
    
    indChk.toDebugString
    // (8) MapPartitionsRDD[11] at mapValues at <console>:24 []
    //  |  ReliableCheckpointRDD[12] at count at <console>:27 []
    

As you can see, in the first case lineage is preserved even if data is fetched from the cache. It means that data can be recomputed from scratch if some partitions of indCache are lost. In the second case lineage is completely lost after the checkpoint and indChk doesn't carry an information required to rebuild it anymore.

checkpoint, unlike cache / persist is computed separately from other jobs. That's why RDD marked for checkpointing should be cached:

It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.

Finally checkpointed data is persistent and not removed after SparkContext is destroyed.

Regarding data storage SparkContext.setCheckpointDir used by RDD.checkpoint requires DFS path if running in non-local mode. Otherwise it can be local files system as well. localCheckpoint and persist without replication should use local file system.

Important Note:

RDD checkpointing is a different concept than a chekpointing in Spark Streaming. The former one is designed to address lineage issue, the latter one is all about streaming reliability and failure recovery.

Clamber answered 1/2, 2016 at 12:10 Comment(9)
Little bit confused here. If I have a long computation chain and I decide to break it, assign it to a variable, and chache it. Then I will read that variable and go forward. Now my question is, how is it different from checkpointing? (except from recomputation perspective). My real question is, in what sitauation we should go for checkpointing instead of the above mentioned process? Given that the checkpointed RDD will not be used in future. Clarification will be helpful :)Hygrometer
Another important difference is that if you persist / cache an RDD, and later dependent RDD-s need to be calculated, then the persisted/cached RDD content is used automatically by Spark to speed up things. But if you just checkpoint the same RDD, it won't be utilized when calculating dependent RDD-s. I wonder when a checkpointed RDD is used by Spark. If there's a failure, will Spark use it automatically? Or am I supposed to spark.read it manually to get the RDD to continue from? That would explain why it's never deleted by Spark. So, how are checkpoints supposed to be used?Chadwick
What is the recommendation for DStream with CustomReceiver implementations? Should they be checkpointed as is the norm for Kafka receivers or should they be persisted?Peduncle
@Hygrometer - re: in what situation should checkpointing be used? Checkpoints hang around even after the job is finished (while persisted-to-disk blocks might be cleaned up). So one thing (which I have read) that checkpoints are useful for is if you have a flaky long running job that sometimes fails in a busy cluster, checkpointing can help you recover faster because on restart you don't have to go all the way back to the beginning of the job.Carrero
@Chadwick - response above might be interesting to you to ! hope it is useful ;^)Carrero
Thanks @Chris. Just for the sake of discussion, I have noticed situations where the dataframe was being used only once, but after using checkpoint, the job finished relatively faster (like around 40% faster in a recent encounter).Hygrometer
regarding your "Important point", the spark documentation does not tell that. spark.apache.org/docs/latest/… In fact, in one of the spark-submit conferences, the presenter said that the checkpoint helps to break the lineage else it will grow in a span of time. Could you please give me some reference to support your point?Towill
So, in which case should we use checkpoint, it seems it is just like saving an RDD and load and RDD. is it the same?Jakejakes
@Chadwick " If there's a failure, will Spark use it automatically? Or am I supposed to spark.read it manually to get the RDD to continue from?" Yes, spark will automatic read data from checkpoint. In my prespective, the broke lineage with father rdd,will let you no need to recompute from source, which will save lots of time.Firecrest
C
60

I think you can find a very detailed answer here

While it is very hard to summarize all in that page, I will say

Persist

  • Persisting or caching with StorageLevel.DISK_ONLY cause the generation of RDD to be computed and stored in a location such that subsequent use of that RDD will not go beyond that points in recomputing the linage.
  • After persist is called, Spark still remembers the lineage of the RDD even though it doesn't call it.
  • Secondly, after the application terminates, the cache is cleared or file destroyed

Checkpointing

  • Checkpointing stores the rdd physically to hdfs and destroys the lineage that created it.
  • The checkpoint file won't be deleted even after the Spark application terminated.
  • Checkpoint files can be used in subsequent job run or driver program
  • Checkpointing an RDD causes double computation because the operation will first call a cache before doing the actual job of computing and writing to the checkpoint directory.

You may want to read the article for more of the details or internals of Spark's checkpointing or Cache operations.

Convection answered 12/10, 2016 at 15:31 Comment(0)
S
10
  1. Persist(MEMORY_AND_DISK) will store the data frame to disk and memory temporary without breaking the lineage of the program i.e. df.rdd.toDebugString() would return the same output. It is recommended to use persist(*) on a calculation, that is going to be reused to avoid recalculation of intermediate results:

    df = df.persist(StorageLevel.MEMORY_AND_DISK)
    calculation1(df)
    calculation2(df)
    

    Note, that caching the data frame does not guarantee, that it will remain in memory until you call it next time. Depending on the memory usage the cache can be discarded.

  2. checkpoint(), on the other hand, breaks lineage and forces data frame to be stored on disk. Unlike usage of cache()/persist(), frequent check-pointing can slow down your program. Checkpoints are recommended to use when a) working in an unstable environment to allow fast recovery from failures b) storing intermediate states of calculation when new entries of the RDD are dependent on the previous entries i.e. to avoid recalculating a long dependency chain in case of failure

Silvia answered 18/9, 2017 at 22:17 Comment(0)
I
0

If you check the relevant part of the documentation, it talks about writing data to a reliable system, e.g. HDFS. But it is up to you to tell Apache Spark where to write its checkpoint information.

On the other hand, persisting is about caching data mostly in memory, as this part of the documentation clearly indicates.

So, it depends on what directory you gave to Apache Spark.

Impetuous answered 1/2, 2016 at 12:21 Comment(1)
Persistence in streaming is quite different issue and not really related to caching.Clamber

© 2022 - 2024 — McMap. All rights reserved.