How to calculate the size of dataframe in bytes in Spark?
Asked Answered
C

3

8

I want to write one large sized dataframe with repartition, so I want to calculate number of repartition for my source dataframe.

numberofpartition = {size of dataframe/default_blocksize}

How to calculate the dataframe size in bytes?

Crept answered 21/4, 2020 at 7:45 Comment(0)
O
15

Usingspark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes we can get the size of actual Dataframe once its loaded into memory. Check the below code.

scala> val df = spark.read.format("orc").load("/tmp/srinivas/")
df: org.apache.spark.sql.DataFrame = [channelGrouping: string, clientId: string ... 75 more fields]

scala> import org.apache.commons.io.FileUtils
import org.apache.commons.io.FileUtils

scala> val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
bytes: BigInt = 763275709

scala> FileUtils.byteCountToDisplaySize(bytes.toLong)
res5: String = 727 MB

scala> import sys.process._
import sys.process._

scala> "hdfs dfs -ls -h /tmp/srinivas/".!
Found 2 items
-rw-r-----   3 svcmxns hdfs          0 2020-04-20 01:46 /tmp/srinivas/_SUCCESS
-rw-r-----   3 svcmxns hdfs    727.4 M 2020-04-20 01:46 /tmp/srinivas/part-00000-9d0b72ea-f617-4092-ae27-d36400c17917-c000.snappy.orc
res6: Int = 0
val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
    val dataSize = bytes.toLong
    val numPartitions = (bytes.toLong./(1024.0)./(1024.0)./(10240)).ceil.toInt // May be you can change or modify this to get required partitions.

    df.repartition(if(numPartitions == 0) 1 else numPartitions)
      .[...]

Edit - 1 : Please use the following logic as per your Spark versions.

Spark 3.0.2

val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats.sizeInBytes

Spark 2.4

val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes

Spark 2.3

val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats.sizeInBytes

PySpark

spark._jsparkSession.sessionState().executePlan(df._jdf.queryExecution().logical()).optimizedPlan().stats().sizeInBytes()
Oilbird answered 21/4, 2020 at 7:50 Comment(6)
For me, this instruction: spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats.sizeInBytes, returns a too big number for a little dataset: Statistics(sizeInBytes=8.11E+236 B, hints=none)... Any hint/suggestion? Thank youStrephonn
@FernandoAspiazu, Facing the same issue, asked a question here #63845965 . If you found any solution please answer in that post.Ostosis
Why 1024 in this line val numPartitions = (bytes.toLong./(1024.0)./(1024.0)./(10240)).ceil.toInt? How do you select those numbers?Cahan
In my case I have very big files .. more than GB.. I have converted 1GB to bytes..that why you are seeing 1024/1024/1024Oilbird
i got the same error. Are you solution, @FernandoAspiazu?Pedro
Sorry for the late response. Unfortunately there would not have any solution for the calculation of a DataFrame, this for its Lazy nature... You are forced to calculate the data before in order to have an idea of the dimension such a DF could have, principally for aggregated data that it's calculated in memory.Strephonn
S
0

I'm sorry for the late update.

Unfortunately there would not be any solution for the calculation of a DataFrame before it is calculated, for its Lazy nature...

You are forced to calculate the data before and write it somewhere (In HDFS or any other FS obviously) in order to have an idea of the dimension such a DF could have, mainly for aggregated data that it's calculated in memory.

This could work for data (mostly raw) that are already persisted in the FileSystem, but not for a result from a query that can change the dimension of a dataset.

For example: You can run an aggregation on 1TB of data and get back some GBs as a result, but we're not able to know how many GBs that dataset will have. We have to run it :(

By the way if someone is able to refute what I'm saying here is kindly welcome :)

Cheers.

Strephonn answered 12/3 at 14:44 Comment(0)
S
0

Sorry for the late post.

For PySpark users, you can use RepartiPy to get the accurate size of your DataFrame as follows:

import repartipy

# Use this if you have enough (executor) memory to cache the whole DataFrame
# If you have NOT enough memory (i.e. too large DataFrame), use 'repartipy.SamplingSizeEstimator' instead.
with repartipy.SizeEstimator(spark=spark, df=df) as se:
    df_size_in_bytes = se.estimate()

RepartiPy leverages Caching Approach internally as mentioned in the above answer, in order to calculate the in-memory size of your DataFrame.

Please see the docs for more details.

Shrewmouse answered 2/7 at 14:43 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.