Order of table joining in SparkSQL for better performance
Asked Answered
A

1

6

I am new to Spark-SQL to read Hive tables. I want to know that how does Spark performs a multi-table Join. I read somewhere that it is recommended to always keep the largest table on the top of the Join order and so on, which is conducive for Join efficiency. I read that in a Join, Spark loads the first table (largest) in the order to Memory and streams the other table which aids in Join performance. However, I am confused as to how this strategy will boost performance since the largest table (in most cases) will not fit in memory and spill on disk.

Can anyone please clarify and explain the Joining mechanism employed by Spark under the hood when joining [large vs medium], [large vs small] and [large vs large] tables in terms of Join types (inner & outer) and Join performance. I want to know the best practices that should be followed in terms of Join table ordering to achieve optimal performance for all the join strategies (SMJ, ShuffleHash & Broadcast) employed by Spark. Let's assume the following query:

select 
a.id, 
b.cust_nm, 
c.prod_nm
from large_tab a
join medium_tab b
on a.id = b.id
join small_tab c
on a.pid = c.pid;

Note: We use Spark 2.4

Any help is much appreciated. Thanks.

Allenaallenby answered 20/6, 2020 at 17:47 Comment(1)
this might be useful description - github.com/apache/spark/blob/v2.4.0/sql/core/src/main/scala/org/…Ansermet
L
6

Regarding the order of the joins, Spark provides the functionality to find the optimal configuration (order) of the tables in the join, but it is related to some configuration settings (the bellow code is provided in PySpark API):

  1. CBO - cost based optimizer has to be turned on (it is off by default in 2.4)
spark.conf.set("spark.sql.cbo.enabled", True)
  1. joinReorder has to be turned on (it is off by default in 2.4)
spark.conf.set("spark.sql.cbo.joinReorder.enabled", True)
  1. To have it work as best as possible, it is good to compute statistics for your tables first. This can be done as follows:
spark.sql("ANALYZE TABLE table_name COMPUTE STATISTICS FRO COLUMNS col1, col2, ...")

Having computed the statistics is quite important here because based on that Spark will estimate the size of the tables in the join and will reorder them accordingly. To have even better estimates, you can also enable the histogram computation for the columns (this is also off by default in 2.4):

spark.conf.set("spark.sql.statistics.histogram.enabled", True)

The maximum number of tables for which this joinReorder can be used can be controlled by this setting

spark.sql.cbo.joinReorder.dp.threshold

and the default value is 12.

Loran answered 21/6, 2020 at 7:40 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.