I want to write one large sized dataframe with repartition, so I want to calculate number of repartition for my source dataframe.
numberofpartition = {size of dataframe/default_blocksize}
How to calculate the dataframe size in bytes?
I want to write one large sized dataframe with repartition, so I want to calculate number of repartition for my source dataframe.
numberofpartition = {size of dataframe/default_blocksize}
How to calculate the dataframe size in bytes?
Usingspark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
we can get the size of actual Dataframe once its loaded into memory. Check the below code.
scala> val df = spark.read.format("orc").load("/tmp/srinivas/")
df: org.apache.spark.sql.DataFrame = [channelGrouping: string, clientId: string ... 75 more fields]
scala> import org.apache.commons.io.FileUtils
import org.apache.commons.io.FileUtils
scala> val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
bytes: BigInt = 763275709
scala> FileUtils.byteCountToDisplaySize(bytes.toLong)
res5: String = 727 MB
scala> import sys.process._
import sys.process._
scala> "hdfs dfs -ls -h /tmp/srinivas/".!
Found 2 items
-rw-r----- 3 svcmxns hdfs 0 2020-04-20 01:46 /tmp/srinivas/_SUCCESS
-rw-r----- 3 svcmxns hdfs 727.4 M 2020-04-20 01:46 /tmp/srinivas/part-00000-9d0b72ea-f617-4092-ae27-d36400c17917-c000.snappy.orc
res6: Int = 0
val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
val dataSize = bytes.toLong
val numPartitions = (bytes.toLong./(1024.0)./(1024.0)./(10240)).ceil.toInt // May be you can change or modify this to get required partitions.
df.repartition(if(numPartitions == 0) 1 else numPartitions)
.[...]
Edit - 1
: Please use the following logic as per your Spark versions.
Spark 3.0.2
val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats.sizeInBytes
Spark 2.4
val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
Spark 2.3
val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats.sizeInBytes
PySpark
spark._jsparkSession.sessionState().executePlan(df._jdf.queryExecution().logical()).optimizedPlan().stats().sizeInBytes()
I'm sorry for the late update.
Unfortunately there would not be any solution for the calculation of a DataFrame before it is calculated, for its Lazy nature...
You are forced to calculate the data before and write it somewhere (In HDFS or any other FS obviously) in order to have an idea of the dimension such a DF could have, mainly for aggregated data that it's calculated in memory.
This could work for data (mostly raw) that are already persisted in the FileSystem, but not for a result from a query that can change the dimension of a dataset.
For example: You can run an aggregation on 1TB of data and get back some GBs as a result, but we're not able to know how many GBs that dataset will have. We have to run it :(
By the way if someone is able to refute what I'm saying here is kindly welcome :)
Cheers.
Sorry for the late post.
For PySpark users, you can use RepartiPy to get the accurate size of your DataFrame as follows:
import repartipy
# Use this if you have enough (executor) memory to cache the whole DataFrame
# If you have NOT enough memory (i.e. too large DataFrame), use 'repartipy.SamplingSizeEstimator' instead.
with repartipy.SizeEstimator(spark=spark, df=df) as se:
df_size_in_bytes = se.estimate()
RepartiPy leverages Caching Approach internally as mentioned in the above answer, in order to calculate the in-memory size of your DataFrame.
Please see the docs for more details.
© 2022 - 2024 — McMap. All rights reserved.
spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats.sizeInBytes
, returns a too big number for a little dataset:Statistics(sizeInBytes=8.11E+236 B, hints=none)
... Any hint/suggestion? Thank you – Strephonn