one word answer :
1) org.apache.spark.sql.functions.broadcast()
function is user supplied,explicit hint for given sql join.
2) sc.broadcast
is for broadcasting readonly shared variable.
More details about broadcast
function #1 :
Here is scala doc from
sql/execution/SparkStrategies.scala
which says.
- Broadcast: if one side of the join has an estimated physical size that is smaller than the * user-configurable
[[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold * or if that
side has an explicit broadcast hint (e.g. the user applied the *
[[org.apache.spark.sql.functions.broadcast()]] function to a
DataFrame), then that side * of the join will be broadcasted
and the other side will be streamed, with no shuffling *
performed. If both sides of the join are eligible to be broadcasted
then the *
- Shuffle hash join: if the average size of a single
partition is small enough to build a hash * table.
- Sort merge: if the matching join keys are sortable.
- If there is no joining keys, Join implementations are chosen with the following precedence:
- BroadcastNestedLoopJoin: if one side of the join could be broadcasted
- CartesianProduct: for Inner join
- BroadcastNestedLoopJoin
- The below method controls the behavior based on size we set to
spark.sql.autoBroadcastJoinThreshold
by default it is 10mb
Note : smallDataFrame.join(largeDataFrame)
does not do a broadcast hash join, but largeDataFrame.join(smallDataFrame)
does.
/** Matches a plan whose output should be small enough to be used in broadcast join.
**/
private def canBroadcast(plan: LogicalPlan): Boolean = {
plan.statistics.isBroadcastable ||
plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold
}
In future the below configurations will be deprecated in coming versions of spark.