Optimizing pyspark code by calculating Dataframe size
Asked Answered
I

1

0

I'm using the following function (partly from a code snippet I got from this post: Compute size of Spark dataframe - SizeEstimator gives unexpected results

and adding my calculations according to what I understood will be distributed within the workers based on this tutorial: https://www.youtube.com/watch?v=hvF7tY2-L3U and ended up with the function below:

def get_partitions(df):
    partitions = 1
    df.cache().foreach(lambda x: x)                                                   
    df_size_in_bytes = spark._jsparkSession.sessionState()\
             .executePlan(df._jdf.queryExecution().logical(),\
             df._jdf.queryExecution().mode()).optimizedPlan()\
             .stats()\
             .sizeInBytes()
    kilo_bytes = int(df_size_in_bytes/1024)
    mega_bytes = int(kilo_bytes/1024)
    parts = int(mega_bytes/128)        
    if parts <= 0:
        parts = partitions
    else:
        partitions = parts   
    return partitions 

Though is giving me the following error:

Py4JError: An error occurred while calling o8705.mode. Trace:
py4j.Py4JException: Method mode([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)

I'm working with AWS Glue jobs and interactive sessions, and in glue jobs I get an empty trace, and in Glue interactive sessions I get the error above, what is missing in the calculation. I'm currently using Spark 3.1 and Glue 3.0. any help will be appreciated!

Incisive answered 28/9, 2023 at 22:28 Comment(0)
B
1

The Spark version that Glue 3.0 uses does not accept the "mode" parameter for the executePlan(), introduced at this commit to the Spark core.

So, you just need to remove the parameter from the method call:

from math import ceil

def get_partitions(df):
    partitions = 1
    df.cache().foreach(lambda x: x)                                                   
    df_size_in_bytes = spark._jsparkSession.sessionState() \         
        .executePlan(
            df._jdf.queryExecution().logical()
        ).optimizedPlan() \
        .stats() \
        .sizeInBytes()

    mega_bytes = df_size_in_bytes/1000**2
    parts = ceil(mega_bytes/128)        
    partitions = 1 if parts == 0 else parts

    return partitions
Billiton answered 29/9, 2023 at 21:33 Comment(2)
@ 100chou Thank you so much for the reference, I'm testing it right now. this will save me more days on debugging the issue I'm facing with my code.Incisive
You're welcome. I adjusted the code because I realized the conversion between bytes to megabytes is wrong, nothing too problematic, just to be accurate.Billiton

© 2022 - 2024 — McMap. All rights reserved.