DataFrame join optimization - Broadcast Hash Join
Asked Answered
C

6

51

I am trying to effectively join two DataFrames, one of which is large and the second is a bit smaller.

Is there a way to avoid all this shuffling? I cannot set autoBroadCastJoinThreshold, because it supports only Integers - and the table I am trying to broadcast is slightly bigger than integer number of bytes.

Is there a way to force broadcast ignoring this variable?

Coo answered 7/9, 2015 at 9:26 Comment(2)
feel like your actual question is "Is there a way to force broadcast ignoring this variable?" It can be controlled through the property I mentioned below.. Since no one addressed, to make it relevant I gave this late answer.Hope that helps!Eaten
Please accept once of the answers as accepted. it will be pointer to others as well. Thanks!Eaten
E
96

Broadcast Hash Joins (similar to map side join or map-side combine in Mapreduce) :

In SparkSQL you can see the type of join being performed by calling queryExecution.executedPlan. As with core Spark, if one of the tables is much smaller than the other you may want a broadcast hash join. You can hint to Spark SQL that a given DF should be broadcast for join by calling method broadcast on the DataFrame before joining it

Example: largedataframe.join(broadcast(smalldataframe), "key")

in DWH terms, where largedataframe may be like fact
smalldataframe may be like dimension

As described by my fav book (HPS) pls. see below to have better understanding.. enter image description here

Note : Above broadcast is from import org.apache.spark.sql.functions.broadcast not from SparkContext

Spark also, automatically uses the spark.sql.conf.autoBroadcastJoinThreshold to determine if a table should be broadcast.

Tip : see DataFrame.explain() method

def
explain(): Unit
Prints the physical plan to the console for debugging purposes.

Is there a way to force broadcast ignoring this variable?

sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")


NOTE :

Another similar out of box note w.r.t. Hive (not spark) : Similar thing can be achieved using hive hint MAPJOIN like below...

Select /*+ MAPJOIN(b) */ a.key, a.value from a join b on a.key = b.key

hive> set hive.auto.convert.join=true;
hive> set hive.auto.convert.join.noconditionaltask.size=20971520
hive> set hive.auto.convert.join.noconditionaltask=true;
hive> set hive.auto.convert.join.use.nonstaged=true;
hive> set hive.mapjoin.smalltable.filesize = 30000000; // default 25 mb made it as 30mb

Further Reading : Please refer my article on BHJ, SHJ, SMJ

Eaten answered 9/9, 2016 at 5:35 Comment(3)
Does it make sense to do largeDF.join(broadcast(smallDF), "right_outer") when i want to do smallDF.join(broadcast(largeDF, "left_outer")? since smallDF should be saved in memory instead of largeDFTowandatoward
Pls. follow SparkStrategies.scala here all cases were mentioned. its not single liner to write it here ... changing left_outer to right_outer result will be changed.Eaten
but in normal case Table1 LEFT OUTER JOIN Table2, Table2 RIGHT OUTER JOIN Table1 are equalEaten
S
23

You can hint for a dataframe to be broadcasted by using left.join(broadcast(right), ...)

Sob answered 21/1, 2016 at 11:56 Comment(6)
What is the right import for this broadcast? I'm getting that this symbol broadcast is not resolvable.Liberty
It is under org.apache.spark.sql.functions, you need spark 1.5.0 or newerSob
Any chance to hint broadcast join to a SQL statement?Lozano
You can use the hint in an SQL statement indeed, but not sure how far this works. e.g. I have used it like left inner join broadcast(right) not sure if it will work with a subquerySob
To broadcast in SparkSQL, you can e.g.: val df = broadcast(spark.table("tableA")).createTempView("tableAView") spark.sql("SELECT ... FROM tableAView a JOIN tableB b")Novelistic
HInts are nore supported as of Spark 2.2.0 - see here: issues.apache.org/jira/browse/SPARK-16475Vitrescence
G
7

Setting spark.sql.autoBroadcastJoinThreshold = -1 will disable broadcast completely. See Other Configuration Options in Spark SQL, DataFrames and Datasets Guide.

Gavra answered 4/11, 2016 at 6:55 Comment(0)
S
3

This is a current limitation of spark, see SPARK-6235. The 2GB limit also applies for broadcast variables.

Are you sure there is no other good way to do this, e.g. different partitioning?

Otherwise you can hack your way around it by manually creating multiple broadcast variables which are each <2GB.

Sandstone answered 7/9, 2015 at 21:55 Comment(1)
I have manage to reduce the size of a smaller table to just a little below the 2 GB, but it seems the broadcast is not happening anyways. (autoBroadcast just wont pick it). It works fine with small tables (100 MB) though.Coo
M
1

I found this code works for Broadcast Join in Spark 2.11 version 2.0.0.

import org.apache.spark.sql.functions.broadcast  

val employeesDF = employeesRDD.toDF
val departmentsDF = departmentsRDD.toDF

// materializing the department data
val tmpDepartments = broadcast(departmentsDF.as("departments"))

import context.implicits._

employeesDF.join(broadcast(tmpDepartments), 
   $"depId" === $"id",  // join by employees.depID == departments.id 
   "inner").show()

Here is the reference for the above code Henning Kropp Blog, Broadcast Join with Spark

Manmade answered 11/8, 2017 at 14:10 Comment(0)
C
1

Using join hints will take precedence over the configuration autoBroadCastJoinThreshold, so using a hint will always ignore that threshold.

In addition, when using a join hint the Adaptive Query Execution (since Spark 3.x) will also not change the strategy given in the hint.

In Spark SQL you can apply join hints as shown below:

SELECT /*+ BROADCAST */ a.id, a.value FROM a JOIN b ON a.id = b.id

SELECT /*+ BROADCASTJOIN */ a.id, a.value FROM a JOIN b ON a.id = b.id

SELECT /*+ MAPJOIN */ a.id, a.value FROM a JOIN b ON a.id = b.id

Note, that the key words BROADCAST, BROADCASTJOIN and MAPJOIN are all aliases as written in the code in hints.scala.

Circumscissile answered 23/4, 2021 at 8:20 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.