How to find size (in MB) of dataframe in pyspark,
df = spark.read.json("/Filestore/tables/test.json")
I want to find how the size of df
or test.json
How to find size (in MB) of dataframe in pyspark,
df = spark.read.json("/Filestore/tables/test.json")
I want to find how the size of df
or test.json
Late answer, but since google brought me here first I figure I'll add this answer based on the comment by user @hiryu here.
This is tested and working for me. This requires caching, so probably is best kept to notebook development.
# Need to cache the table (and force the cache to happen)
df.cache()
df.count() # force caching
# need to access hidden parameters from the `SparkSession` and `DataFrame`
catalyst_plan = df._jdf.queryExecution().logical()
size_bytes = spark._jsparkSession.sessionState().executePlan(catalyst_plan).optimizedPlan().stats().sizeInBytes()
# always try to remember to free cached data once finished
df.unpersist()
print("Total table size: ", convert_size_bytes(size_bytes))
You need to access the hidden
_jdf
and_jSparkSession
variables. Since Python objects do not expose the needed attributes directly, they won't be shown by IntelliSense.
My convert_size_bytes
function looks like:
def convert_size_bytes(size_bytes):
"""
Converts a size in bytes to a human readable string using SI units.
"""
import math
import sys
if not isinstance(size_bytes, int):
size_bytes = sys.getsizeof(size_bytes)
if size_bytes == 0:
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return "%s %s" % (s, size_name[i])
py4j.Py4JException: Method executePlan([class org.apache.spark.sql.catalyst.plans.logical.Filter]) does not exist
I suggest using python # Need to cache the table (and force the cache to happen) df.cache() nrows = df.count() # force caching # need to access hidden parameters from the `SparkSession` and `DataFrame` size_bytes = sc._jvm.org.apache.spark.util.SizeEstimator.estimate(df._jdf)
–
Expectoration My running version
# Need to cache the table (and force the cache to happen)
df.cache()
nrows = df.count() # force caching
# need to access hidden parameters from the `SparkSession` and `DataFrame`
size_bytes = sc._jvm.org.apache.spark.util.SizeEstimator.estimate(df._jdf)
In general this is not easy. You can
org.apache.spark.util.SizeEstimator
df.inputfiles()
and use an other API to get the file size directly (I did so using Hadoop Filesystem API (How to get file size). Not that only works if the dataframe was not fitered/aggregatedWe can use the explain
to get the size.
df.explain('cost')
== Optimized Logical Plan ==
Relation [value#0] text, Statistics(sizeInBytes=24.3 KiB)
You can convert into MBs.
Officially, you can use Spark's SizeEstimator in order to get the size of a DataFrame. But it seems to provide inaccurate results as discussed here and in other SO topics.
You can use RepartiPy instead to get the accurate size of your DataFrame as follows:
import repartipy
# Use this if you have enough (executor) memory to cache the whole DataFrame
# If you have NOT enough memory (i.e. too large DataFrame), use 'repartipy.SamplingSizeEstimator' instead.
with repartipy.SizeEstimator(spark=spark, df=df) as se:
df_size_in_bytes = se.estimate()
RepartiPy leverages L Co's approach internally to calculate the in-memory size of your DataFrame. Please see the docs for more details.
I initially tried L Co's answer but this did not work for me on pyspark==3.3.0
I built upon this to find a working solution for this version at least:
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.master("local").appName("TestApp")
.getOrCreate()
)
df = spark.createDataFrame([("Alice", 2), ("Bob", 5)], ("name", "age"))
df2 = spark.createDataFrame([("Alice", 2), ("Bob", 5), ("Cat", 7)], ("name", "age"))
def convert_size_bytes(size_bytes):
"""
Converts a size in bytes to a human readable string using SI units.
"""
import math
import sys
if not isinstance(size_bytes, int):
size_bytes = sys.getsizeof(size_bytes)
if size_bytes == 0:
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return "%s %s" % (s, size_name[i])
def estimate_df_size(df):
df.cache()
df.count() # force caching
# need to access hidden parameters from the `SparkSession` and `DataFrame`
catalyst_plan = df._jdf.queryExecution().logical()
size_bytes = spark._jsparkSession.sessionState().executePlan(catalyst_plan, df._jdf.queryExecution().mode()).optimizedPlan().stats().sizeInBytes()
# always try to remember to free cached data once finished
df.unpersist()
print("Total table size: ", convert_size_bytes(size_bytes))
estimate_df_size(df)
estimate_df_size(df2)
Using
catalyst_plan = df._jdf.queryExecution().logical()
size_bytes = spark._jsparkSession.sessionState().executePlan(catalyst_plan, df._jdf.queryExecution().mode()).optimizedPlan().stats().sizeInBytes()
with the orders table of TPC-H benchmark gave 181370637, which seems irrelevant.
However, using df.explain('cost') as proposed above gave what seems as accurate answer as it meshes with the file size as measured on disk:
spark.read.parquet('orders.parquet')).explain('cost')
== Optimized Logical Plan ==
Relation [o_orderkey#32L,...] parquet, Statistics(sizeInBytes=51.3 MiB)
$ du -b orders.parquet
53838493 orders.parquet
53838493/1048576 = 51.34 Mib as revealed in the Physical plan.
Is there a way to parse .explain results to obtain the value for Statistics(sizeInBytes)?
© 2022 - 2024 — McMap. All rights reserved.