(Why) do we need to call cache or persist on a RDD
Asked Answered
D

5

192

When a resilient distributed dataset (RDD) is created from a text file or collection (or from another RDD), do we need to call "cache" or "persist" explicitly to store the RDD data into memory? Or is the RDD data stored in a distributed way in the memory by default?

val textFile = sc.textFile("/user/emp.txt")

As per my understanding, after the above step, textFile is a RDD and is available in all/some of the node's memory.

If so, why do we need to call "cache" or "persist" on textFile RDD then?

Deteriorate answered 11/3, 2015 at 8:8 Comment(0)
J
337

Most RDD operations are lazy. Think of an RDD as a description of a series of operations. An RDD is not data. So this line:

val textFile = sc.textFile("/user/emp.txt")

It does nothing. It creates an RDD that says "we will need to load this file". The file is not loaded at this point.

RDD operations that require observing the contents of the data cannot be lazy. (These are called actions.) An example is RDD.count — to tell you the number of lines in the file, the file needs to be read. So if you write textFile.count, at this point the file will be read, the lines will be counted, and the count will be returned.

What if you call textFile.count again? The same thing: the file will be read and counted again. Nothing is stored. An RDD is not data.

So what does RDD.cache do? If you add textFile.cache to the above code:

val textFile = sc.textFile("/user/emp.txt")
textFile.cache

It does nothing. RDD.cache is also a lazy operation. The file is still not read. But now the RDD says "read this file and then cache the contents". If you then run textFile.count the first time, the file will be loaded, cached, and counted. If you call textFile.count a second time, the operation will use the cache. It will just take the data from the cache and count the lines.

The cache behavior depends on the available memory. If the file does not fit in the memory, for example, then textFile.count will fall back to the usual behavior and re-read the file.

Juryman answered 11/3, 2015 at 10:13 Comment(7)
Hi daniel, - when you call cache, does this mean that the RDD is not reloaded from source (e.g. text file) - how can you be sure that the data from the text file is most recent when its cached? (does spark figure this out or is it a manual operation to unpersist() periodically to ensure the source data gets recomputed later in the lineage?)Khelat
also - if you must unpersist periodically, - if you have an rdd which is cached, dependant on another RDD which is cached, must you unpersist both RDD's to see recomputed results?Khelat
Spark just assumes the file will never change. It reads the file at an arbitrary point in time and may re-read parts of it as necessary later. (E.g. if a piece of the data was pushed out from cache.) So you better keep your files unchanging! Just create a new file with a new name when you have new data, then load it as a new RDD. If you are continuously getting new data, look into Spark Streaming.Juryman
what about if you have already constructed a very large computation tree, such as, Load Text file RDD_A, join to Another RDD_B, filter the results of that join and put in RDD_C_fieldA -> construct new RDD_D, count based on RDD_D_fieldB. Does this mean the whole tree will need to be reconstructed when new data changes at RDD_A?Khelat
Yes. RDDs are immutable, so every RDD assumes its dependencies are immutable as well. Spark Streaming allows you to set up such trees that operate on a stream of changes. But an even simpler solution is to build the tree in a function that takes a file name as its parameter. Then just call the function for the new file and poof, you've got the new computation tree.Juryman
Daniel, thanks for good answer. I have some question regarding Spark caching, in my case, (1) I have loaded 11 Gb text file then ran count() it took around 2.7 min, and (2) then I did the same but before count() I cached it. After running I immediately look at Spark Web UI for metrics, at the beginning the progress bar showed 40% of the data was processed (in 14 sec) but then the speed was decreased and was same as the first run (overall time is 2.2 min). I thought caching works in cluster scale but after this I guess caching works just in one node scale, is it right? (my cluster is 5 nodes)Horsewhip
@Humoyun: On the Storage tab of Spark UI you can see how much of each RDD is cached. The data may be so big that only 40% of it fits in the total memory you have for caching. One option in this case is to use perisist and pick a storage option that allows spilling the cache data to disk.Juryman
B
210

I think the question would be better formulated as:

When do we need to call cache or persist on a RDD?

Spark processes are lazy, that is, nothing will happen until it's required. To quick answer the question, after val textFile = sc.textFile("/user/emp.txt") is issued, nothing happens to the data, only a HadoopRDD is constructed, using the file as source.

Let's say we transform that data a bit:

val wordsRDD = textFile.flatMap(line => line.split("\\W"))

Again, nothing happens to the data. Now there's a new RDD wordsRDD that contains a reference to testFile and a function to be applied when needed.

Only when an action is called upon an RDD, like wordsRDD.count, the RDD chain, called lineage will be executed. That is, the data, broken down in partitions, will be loaded by the Spark cluster's executors, the flatMap function will be applied and the result will be calculated.

On a linear lineage, like the one in this example, cache() is not needed. The data will be loaded to the executors, all the transformations will be applied and finally the count will be computed, all in memory - if the data fits in memory.

cache is useful when the lineage of the RDD branches out. Let's say you want to filter the words of the previous example into a count for positive and negative words. You could do this like that:

val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Here, each branch issues a reload of the data. Adding an explicit cache statement will ensure that processing done previously is preserved and reused. The job will look like this:

val textFile = sc.textFile("/user/emp.txt")
val wordsRDD = textFile.flatMap(line => line.split("\\W"))
wordsRDD.cache()
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

For that reason, cache is said to 'break the lineage' as it creates a checkpoint that can be reused for further processing.

Rule of thumb: Use cache when the lineage of your RDD branches out or when an RDD is used multiple times like in a loop.

Bicipital answered 11/3, 2015 at 10:48 Comment(7)
Awesome. Thanks. One more related question. When we cache or persist, data will be stored in executor's memory or worker node's memory. If it is executor's memory, How Spark identifies which executor has the data.Deteriorate
@RamanaUppala the executor memory is used. The fraction of executor memory used for caching is controlled by the config spark.storage.memoryFraction. Regarding which executor has which data, an RDD will keep track of its partitions that are distributed on the executors.Bicipital
@Bicipital Correct me if I am wrong but neither cache nor persist can break the lineage.Guadalcanal
Where would the wordsRDD be stored if we haven't had the .cache() statement in the above example?Toul
what if before the two counts, we union the two branches back to one rdd and count? in this case, is the cache beneficial?Muna
@sun_dare: no where. since above example not branching(shuffle) anywhere.Coadjutor
when we do a broadcast on a DataFrame that has let's just say 8 partitions, is it that spark first, will send all the data to the driver and then the driver will do the broadcasting, or is it something else?Aarika
T
31

Do we need to call "cache" or "persist" explicitly to store the RDD data into memory?

Yes, only if needed.

The RDD data stored in a distributed way in the memory by default?

No!

And these are the reasons why :

  • Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.

  • RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).

  • All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently – for example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

  • By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

For more details please check the Spark programming guide.

Trichromatism answered 11/3, 2015 at 8:48 Comment(9)
That didn't answer my question.Deteriorate
What doesn't answer it?Trichromatism
when RDD's data is stored in the memory default, why do we need to call Cache or Persist ?Deteriorate
RDD's are not stored in memory by default, so persisting the RDD makes Spark perform transformation faster on the clusterTrichromatism
So when we call persist or cache,then only it is really "Resilient Distributed Dataset". Otherwise, it is not distributed data set and data not stored in memory. is it correct ?Deteriorate
Your question is very interesting. Actually, Spark distributes the RDD on the cluster when it's performing an action or a transformation. Caching just persists its distributed state so it doesn't need to be re-distributed again each time you need to work on it, thus the transformation goes faster.Trichromatism
It's a good answer, I don't know why it was downvoted. It's a top-down answer, explaining how RDDs work from the high-level concepts. I've added another answer that goes from bottom-up: starting from "what does this line do". Maybe it's easier to follow for someone just starting out with Spark.Juryman
@Trichromatism I downvoted it because I find the affirmations at the beginning misleading for someone not reading the whole answer: "Do we need to call "cache" or "persist" explicitly to store the RDD data into memory? Yes, but only if needed. e.g. ALL linear RDD transformations will not benefit from caching and yet the data will be loaded and processed in memory.Bicipital
Thanks for you remark. The question concerns the action of storing the RDD and not the need of caching and that's why I didn't write but only if needed. :)Trichromatism
H
12

Below are the three situations you should cache your RDDs:

using an RDD many times

performing multiple actions on the same RDD

for long chains of (or very expensive) transformations

Hass answered 16/10, 2017 at 22:12 Comment(0)
N
8

Adding another reason to add (or temporarily add) cache method call.

for debug memory issues

with cache method, spark will give debugging informations regarding the size of the RDD. so in the spark integrated UI, you will get RDD memory consumption info. and this proved very helpful diagnosing memory issues.

Nordin answered 18/4, 2017 at 11:24 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.