Spark: efficiency of dataframe checkpoint vs. explicitly writing to disk
Asked Answered
D

3

26

Checkpoint version:

val savePath = "/some/path"
spark.sparkContext.setCheckpointDir(savePath)
df.checkpoint()

Write to disk version:

df.write.parquet(savePath)
val df = spark.read.parquet(savePath)

I think both break the lineage in the same way.

In my experiments checkpoint is almost 30 times bigger on disk than parquet (689GB vs. 24GB). In terms of running time, checkpoint takes 1.5 times longer (10.5 min vs 7.5 min).

Considering all this, what would be the point of using checkpoint instead of saving to file? Am I missing something?

Dav answered 9/8, 2018 at 17:25 Comment(0)
J
12

Checkpointing is a process of truncating RDD lineage graph and saving it to a reliable distributed (HDFS) or local file system. If you have a large RDD lineage graph and you want freeze the content of the current RDD i.e. materialize the complete RDD before proceeding to the next step, you generally use persist or checkpoint. The checkpointed RDD then could be used for some other purpose.

When you checkpoint the RDD is serialized and stored in Disk. It doesn't store in parquet format so the data is not properly storage optimized in the Disk. Contraty to parquet which provides various compaction and encoding to store optimize the data. This would explain the difference in the Size.

  • You should definitely think about checkpointing in a noisy cluster. A cluster is called noisy if there are lots of jobs and users which compete for resources and there are not enough resources to run all the jobs simultaneously.

  • You must think about checkpointing if your computations are really expensive and take long time to finish because it could be faster to write an RDD to HDFS and read it back in parallel than recompute from scratch.

And there's a slight inconvenience prior to spark2.1 release; there is no way to checkpoint a dataframe so you have to checkpoint the underlying RDD. This issue has been resolved in spark2.1 and above versions.

The problem with saving to Disk in parquet and read it back is that

  • It could be inconvenient in coding. You need to save and read multiple times.
  • It could be a slower process in the overall performance of the job. Because when you save as parquet and read it back the Dataframe needs to be reconstructed again.

This wiki could be useful for further investigation

As presented in the dataset checkpointing wiki

Checkpointing is actually a feature of Spark Core (that Spark SQL uses for distributed computations) that allows a driver to be restarted on failure with previously computed state of a distributed computation described as an RDD. That has been successfully used in Spark Streaming - the now-obsolete Spark module for stream processing based on RDD API.

Checkpointing truncates the lineage of a RDD to be checkpointed. That has been successfully used in Spark MLlib in iterative machine learning algorithms like ALS.

Dataset checkpointing in Spark SQL uses checkpointing to truncate the lineage of the underlying RDD of a Dataset being checkpointed.

Japan answered 9/8, 2018 at 19:43 Comment(11)
Thanks for your answer. A few comments: checkpoint is provided by the DF api, though in the background it might checkpoint the underlying RDD. Since writing to disk explicitly seems to be more time and space efficient why would I use checkpoint in this case? Also, saving to file allows me to delete the "checkpoint" directory when I'm done, while checkpoint saves to a random directory name which makes it hard to delete afterwards.Dav
It could be inconvenient in coding. You need to save and read multiple times. It could be a slower process in the overall performance of the job. Because when you save as parquet and read it back the Dataframe needs to be reconstructed again.Japan
You could pass an argument in the sparkjob for the racdom directory and delete that from HDFS at the end of the spark job.Japan
In my tests saving to parquet and reconstructing the dataframe is faster than checkpointing, do you think this doesn’t hold in general?Dav
In generally probably however I am not sure here.. Could you please try to ask this in spark forums. The documentation doesn't specify anything about this.Japan
I'm not sure this answers the question. Your two claims on the advantages of checkpoint over parquet are that checkpoint might be faster, but in OP's case that's not true, and that saving as parquet might require more effort, which again is not true in OP's case (both little two line things to save and load).Tedious
The problem is this could not be true for this special case however may not be generalized for all cases. Now why in this case the checkpoint is worse is probably require further dive deep in the code and the actual data. Now my knowledge here is limited by the documentation which doesn't provide a clear indication about this. So yes this is required further research. If I found something more I shall update.Japan
@AvishekBhattacharya Your comment ... faster to write to an RDD to HDFS ... I assume that we need to code for this and that if Spark needs to recover it knows that it needs to start from there as lineage was broken. Just checking.Gametocyte
@thebluephantom, yes we need to code for this. If spark fails in subsequent stages after the rdd is written to HDfS, it will know from where to start.Japan
@AvishekBhattacharya I performed this: val cp = ds.checkpoint() and could see that checkpoint occurred in a directory I specified. So, what do you mean need to checkpoint underlying rdd?Gametocyte
@Gametocyte Thanks for pointing this out. When I wrote the answer I thought the checkpointing is only supported in RDD as per the issue: issues.apache.org/jira/browse/SPARK-11879. I see the support has been aded spark 2.1 and above. Sorry for this. I used this feature in previous spark 1.* versions where you need to do df.rdd.checkpoint. I have updated my answer. Thanks again for pointing this out !!! Really appreciate !Japan
K
1

I guess it depends on the goals of what you are trying to do.

Before I discovered checkpoint, I was doing the write and read from parquet method which I noticed truncated the lineage. I noticed that Checkpoint was accomplishing the same thing but performs worse. I ended up sticking with write and read.

You can find some tests on the various methods here:

https://spokeo-engineering.medium.com/whats-the-fastest-way-to-store-intermediate-results-in-spark-54f2080defb6

Kipp answered 4/10, 2023 at 17:57 Comment(0)
S
0

One difference is that if your spark job needs a certain in memory partitioning scheme, eg if you use a window function, then checkpoint will persist that to disk, whereas writing to parquet will not.

I'm not aware of a way with the current versions of spark to write parquet files and then read them in again, with a particular in memory partitioning strategy. Folder level partitioning doesn't help with this.

Spleeny answered 10/9, 2021 at 17:57 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.