How to check if data is cached in dataframe or not yet cached due to lazy execution in Pyspark?
Asked Answered
A

2

1

My question is little different from other question I could find on stack overflow. I need to know if the data is already retrieved and stored in a dataframe or if that is yet to happen

I am doing something like this

df1=spark.table("sourceDB.Table1")
df1.cache()

Now, as you might be aware, data is not read yet from the source table due to lazy execution. So I need to have an expression here that says the result as "False" at this point.

After sometime, I am doing some operation that requires data to be retrieved from source. For example.

df1.groupBy("col3").agg(sum("col1").alias("sum_of_col1")).select("sum_of_col1","col3").filter("sum_of_col1 >= 100").show()

At this point, data must have been read and stored in cache for df1. So I need to have an expression here that says the result as "True" at this point.

Is there anyway we can achieve this? I believe df1.is_cached will not help in this situation

Ariosto answered 22/7, 2020 at 12:57 Comment(2)
I don't think there is any such function that marks true or false. But, from the way you explained in the question, it looks like you already know when it is loaded or until when it will be skipped due to the concept of lazy evaluation. As a rule of thumb, you can say that till the point any action is not called on a DataFrame, it will not be loaded. Simply, the execution plan would be generated until then.Headliner
Thanks Ajay. My use case is little different. There are some checkpoints in my job that I would like to cache. If the job fails in the middle, I need to store the last cached dataframe in a table. So I need a programatic way of finding if the dataframe is already cached in memory or yet to be cached.Ariosto
J
4

Perhaps this is useful

1. If you wanted to check whether the cache/persist is already triggered on the dataframe then you can use cachemanagerto confirm that as below-

spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).nonEmpty

2. If you wanted to check if the data is there in the memory, perhaps below method would be helpful-

 def checkIfDataIsInMemory(df: DataFrame): Boolean = {
      val manager = df.sparkSession.sharedState.cacheManager
      // step 1 - check if the dataframe.cache is issued earlier or not
      if (manager.lookupCachedData(df.queryExecution.logical).nonEmpty) {// cache statement was already issued
        println("Cache statement is already issued on this dataframe")
        // step-2 check if the data is in memory or not
        val cacheData = manager.lookupCachedData(df.queryExecution.logical).get
        cacheData.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
      } else false
    }

3. Test the above method-

 val df = spark.read
      .parquet(getClass.getResource("/parquet/plain/part-00000-4ece3595-e410-4301-aefd-431cd1debf91-c000.snappy" +
        ".parquet").getPath)
    println(checkIfDataIsInMemory(df))
    /**
      * false
      */
    
    df.cache()
    // check if the data is cached
    println(checkIfDataIsInMemory(df))
    /**
      * Cache statement is already issued on this dataframe
      * false
      */
    
    println(df.count())
    println(checkIfDataIsInMemory(df))

    /**
      * 1
      * Cache statement is already issued on this dataframe
      * true
      */
Jesusa answered 22/7, 2020 at 14:55 Comment(4)
Good. .. learned new one - sharedState.cacheManager Anguine
thanks for your response and your time. Is there anyway we can do this in PySpark instead of Scala? Looks like sharedState method is not available in pyspark sparkSessionAriosto
can't you use spark._jsparkSession.sharedState?Jesusa
@Som, This gives a correct answer only for the same job, but does it show if the previous job execution cached the data ? as I want to cache one time only then check if it's cached or not in the next job execution.Acth
M
1

pyspark version (loosely based on answer from @som)


def is_cached(df: DataFrame) -> bool:
    jspark: Any = df.sparkSession._jsparkSession
    jdf: Any = df._jdf
    plan = jdf.queryExecution().logical()
    cache = jspark.sharedState().cacheManager().lookupCachedData(plan)
    return (
        cache.nonEmpty() and
        cache.get().cachedRepresentation().cacheBuilder().isCachedColumnBuffersLoaded()
    )

df = spark.createDataFrame([Row(id=1)])
is_cached(df) # False
df.count()
is_cached(df) # False
df.cache()
is_cached(df) # False
df.count()
is_cached(df) # True

Menstruum answered 5/1 at 4:31 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.