How to check specific partition data from Spark partitions in Pyspark
Asked Answered
C

1

7

I have a created two dataframes in pyspark from my hive table as:

data1 = spark.sql("""
   SELECT ID, MODEL_NUMBER, MODEL_YEAR ,COUNTRY_CODE
   from MODEL_TABLE1 where COUNTRY_CODE in ('IND','CHN','USA','RUS','AUS')
""");

each country is having millions of unique ID in alphanumeric format.

data2 = spark.sql("""
   SELECT ID,MODEL_NUMBER, MODEL_YEAR, COUNTRY_CODE
   from MODEL_TABLE2 where COUNTRY_CODE in ('IND','CHN')
""");

I want to join both of these dataframe using pyspark on ID column.

How can we re-partition our data so that its get distributed uniformly across the partitions.

Can i use below to reparation my data?

newdf1 = data2.repartition(100, "ID")
newdf2 = data2.repartition(100, "ID")

what would be the best way for partitioning so that join work faster?

Chequer answered 4/10, 2018 at 8:44 Comment(0)
C
9

As far as I know your approach repartition providing an ID column is correct. Consider the following as proof of concept using spark_partition_id() to get the corrresponding partition id:

Create some dummy data

import pandas as pd
import numpy as np
from pyspark.sql.functions import spark_partition_id

def create_dummy_data():

    data = np.vstack([np.random.randint(0, 5, size=10), 
                      np.random.random(10)])

    df = pd.DataFrame(data.T, columns=["id", "values"])

    return spark.createDataFrame(df)

def show_partition_id(df):
    """Helper function to show partition."""
    return df.select(*df.columns, spark_partition_id().alias("pid")).show()

df1 = create_dummy_data()
df2 = create_dummy_data()

Show partition id before repartioning

show_partition_id(df1)

+---+-------------------+---+
| id|             values|pid|
+---+-------------------+---+
|1.0| 0.6051170383675885|  0|
|3.0| 0.4613520717857513|  0|
|0.0|  0.797734780966592|  1|
|2.0|0.35594664760134587|  1|
|2.0|0.08223203758144915|  2|
|0.0| 0.3112880092048709|  2|
|4.0| 0.2689639324292137|  3|
|1.0| 0.6466782159542134|  3|
|0.0| 0.8340472796153436|  3|
|4.0| 0.8054752411745659|  3|
+---+-------------------+---+

show_partition_id(df2)

+---+-------------------+---+
| id|             values|pid|
+---+-------------------+---+
|4.0| 0.8950517294190533|  0|
|3.0| 0.4084717827425539|  0|
|3.0|  0.798146627431009|  1|
|4.0| 0.8039931522181247|  1|
|3.0|  0.732125135531736|  2|
|0.0|  0.536328329270619|  2|
|1.0|0.25952064363007576|  3|
|2.0| 0.1958334111199559|  3|
|0.0|  0.728098753644471|  3|
|0.0| 0.9825387111807906|  3|
+---+-------------------+---+

Show partition id after repartitioning

show_partition_id(df1.repartition(2, "id"))

+---+-------------------+---+
| id|             values|pid|
+---+-------------------+---+
|1.0| 0.6051170383675885|  0|
|3.0| 0.4613520717857513|  0|
|4.0| 0.2689639324292137|  0|
|1.0| 0.6466782159542134|  0|
|4.0| 0.8054752411745659|  0|
|0.0|  0.797734780966592|  1|
|2.0|0.35594664760134587|  1|
|2.0|0.08223203758144915|  1|
|0.0| 0.3112880092048709|  1|
|0.0| 0.8340472796153436|  1|
+---+-------------------+---+

show_partition_id(df2.repartition(2, "id"))

+---+-------------------+---+
| id|             values|pid|
+---+-------------------+---+
|4.0| 0.8950517294190533|  0|
|3.0| 0.4084717827425539|  0|
|3.0|  0.798146627431009|  0|
|4.0| 0.8039931522181247|  0|
|3.0|  0.732125135531736|  0|
|1.0|0.25952064363007576|  0|
|0.0|  0.536328329270619|  1|
|2.0| 0.1958334111199559|  1|
|0.0|  0.728098753644471|  1|
|0.0| 0.9825387111807906|  1|
+---+-------------------+---+

After repartitioning, ids 0 and 2 are located on the same partition and the rest is on the other partition.

Chronometry answered 4/10, 2018 at 9:59 Comment(4)
Many Thanks for quick response. It's giving me an error when I am defining the Python function >>> def show_partition_id(df): ... """Helper function to show partition.""" ... return df.select(*df.columns, spark_partition_id().alias("pid")).show() ... File "<stdin>", line 3 SyntaxError: only named arguments may follow *expressionChequer
Yes, this is related to your python version. Try return df.select(spark_partition_id().alias("pid"), *df.columns).show() instead.Chronometry
I was trying to join these two dataframes but It's proceeding with sort Merge join instead of Shuffle Hash Join even after setting the below properties spark.conf.set("spark.sql.join.preferSortMergeJoin","false") spark.conf.se("spark.sql.autoBroadcastJoinThreshold", "1")Chequer
Well, I wonder if you get more performance by manually trying to repartition before joining instead of letting spark's dataframe optimizer figure out the most performant way of merging anyway. Would be interesting to see any benchmarks on this. However, I can't help you any further here as I simply don't know exactly whats going on. Perhaps the merge sort is only used within partitions, which would be ok because no shuffling is required.Chronometry

© 2022 - 2024 — McMap. All rights reserved.