Why does calling cache take a long time on a Spark Dataset?
Asked Answered
M

2

13

I'm loading large datasets and then caching them for reference throughout my code. The code looks something like this:

val conversations = sqlContext.read
  .format("com.databricks.spark.redshift")
  .option("url", jdbcUrl)
  .option("tempdir", tempDir)
  .option("forward_spark_s3_credentials","true")
  .option("query", "SELECT * FROM my_table "+
                   "WHERE date <= '2017-06-03' "+
                   "AND date >= '2017-03-06' ")
  .load()
  .cache()

If I leave off the cache, the code executes quickly because Datasets are evaluated lazily. But if I put on the cache(), the block takes a long time to run.

From the online Spark UI's Event Timeline, it appears that the SQL table is being transmitted to the worker nodes and then cached on the worker nodes.

Why is cache executing immediately? The source code appears to only mark it for caching when the data is computed:

The source code for Dataset calls through to this code in CacheManager.scala when cache or persist is called:

  /**
   * Caches the data produced by the logical representation of the given [[Dataset]].
   * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
   * recomputing the in-memory columnar representation of the underlying table is expensive.
   */
  def cacheQuery(
      query: Dataset[_],
      tableName: Option[String] = None,
      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
    val planToCache = query.logicalPlan
    if (lookupCachedData(planToCache).nonEmpty) {
      logWarning("Asked to cache already cached data.")
    } else {
      val sparkSession = query.sparkSession
      cachedData.add(CachedData(
        planToCache,
        InMemoryRelation(
          sparkSession.sessionState.conf.useCompression,
          sparkSession.sessionState.conf.columnBatchSize,
          storageLevel,
          sparkSession.sessionState.executePlan(planToCache).executedPlan,
          tableName)))
    }
  }

Which only appears to mark for caching rather than actually caching the data. And I would expect caching to return immediately based on other answers on Stack Overflow as well.

Has anyone else seen caching happening immediately before an action is performed on the dataset? Why does this happen?

Mirabel answered 31/7, 2017 at 15:34 Comment(8)
which spark version are you using?Africa
Spark 2.1.1 on DatabricksMirabel
This question seemed similar but your example has no sorting, hmmm... #42952439Olivia
Actually, I think it does, see "WHERE date <= '2017-06-03' " + "AND date >= '2017-03-06' ". I would expect that to compile to a sort operation.Courtland
Date was a partition of the redshift database -- it should not require a sort in this caseMirabel
You could check in the debug output/explain of the load, to see if for some reason a sort (or repartition) gets added to the DAG.Courtland
I wonder it is to do with dataset source. in this case "com.databricks.spark.redshift" if you apply the same logic to some dataset which is already on your datalake disk, such as in parquet, avro, or csv format. does the same thing happen? might be too late for your interest :) since it is 4 years agos question.Crossbones
@Crossbones Yes, too late for my interest. But you never know who else might be interested in answering this unanswered question at this point. Apparently there are at least ten others out there who may still be interested, given the current upvotes :-).Mirabel
M
0

I now believe that, as Erik van Oosten answers, the cache() command causes the query to execute.

A close look at the code in my OP does indeed appear to show that the command is being cached. There are two key lines where I think the caching is occurring:

cachedData.add(CachedData(...))

This line creates a new CachedData object, which is added to a cachedData collection of some sort. While the cached data object may be a placeholder to hold cached data later on, it seems more likely that the CachedData object truly holds cached data.

And more importantly, this line:

sparkSession.sessionState.executePlan(planToCache).executedPlan

appears to actually execute the plan. So based on my experience, Erik van Oosten gut feeling about what's going on here, and the source code, I believe that calling cache() causes a Spark Dataset's plan to be executed.

Mirabel answered 3/5, 2022 at 16:2 Comment(0)
A
0

cache is one of those operators that causes execution of a dataset. Spark will materialize that entire dataset to memory. If you invoke cache on an intermediate dataset that is quite big, this may take a long time.

What might be problematic is that the cached dataset is only stored in memory. When it no longer fits, partitions of the dataset get evicted and are re-calculated as needed (see https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence). With too little memory present, your program could spend a lot of time on re-calculations.

To speed things up with caching, you could give the application more memory, or you can try to use persist(MEMORY_AND_DISK) instead of cache.

Ahasuerus answered 19/4, 2022 at 12:40 Comment(3)
Thank you. You speak as one who knows. Are you a member of the Scala group? I'm still a little confused by this answer as I did not see materializing the entire dataset in the source code and the documentation does not make any mention of it happening. Are you confident this answer is correct?Mirabel
Oh wait, is this it? (Quoted from my OP): sparkSession.sessionState.executePlan(planToCache).executedPlanMirabel
I am just a random Spark user. Even so, I am pretty confident what I wrote is correct. Spark people do not use the word materialize. They say (from the same link): "One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use."Ahasuerus
M
0

I now believe that, as Erik van Oosten answers, the cache() command causes the query to execute.

A close look at the code in my OP does indeed appear to show that the command is being cached. There are two key lines where I think the caching is occurring:

cachedData.add(CachedData(...))

This line creates a new CachedData object, which is added to a cachedData collection of some sort. While the cached data object may be a placeholder to hold cached data later on, it seems more likely that the CachedData object truly holds cached data.

And more importantly, this line:

sparkSession.sessionState.executePlan(planToCache).executedPlan

appears to actually execute the plan. So based on my experience, Erik van Oosten gut feeling about what's going on here, and the source code, I believe that calling cache() causes a Spark Dataset's plan to be executed.

Mirabel answered 3/5, 2022 at 16:2 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.