When using Dataframe broadcast function or the SparkContext broadcast functions, what is the maximum object size that can be dispatched to all executors?
broadcast
function :
Default is 10mb but we have used till 300 mb which is controlled by spark.sql.autoBroadcastJoinThreshold.
AFAIK, It all depends on memory available. so there is no definite answer for this. what I would say is, it should be less than large dataframe and you can estimate large or small dataframe size like below...
import org.apache.spark.util.SizeEstimator
logInfo(SizeEstimator.estimate(yourlargeorsmalldataframehere))
based on this you can pass broadcast
hint to framework.
Also have a look at scala doc from sql/execution/SparkStrategies.scala
which says....
- Broadcast: if one side of the join has an estimated physical size that is smaller than the user-configurable [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold or if that side has an explicit broadcast hint (e.g. the user applied the
[[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame), then that side of the join will be broadcasted and the other side will be streamed, with no shuffling
performed. If both sides are below the threshold, broadcast the smaller side. If neither is smaller, BHJ is not used.- Shuffle hash join: if the average size of a single partition is small enough to build a hash table.
- Sort merge: if the matching join keys are sortable.
- If there is no joining keys, Join implementations are chosen with the following precedence:
- BroadcastNestedLoopJoin: if one side of the join could be broadcasted
- CartesianProduct: for Inner join
- BroadcastNestedLoopJoin
Also have a look at other-configuration-options
SparkContext.broadcast
(TorrentBroadcast ) :
broadcast shared variable also has a property spark.broadcast.blockSize=4M
AFAIK there is no hard core limitation I have seen for this as well...
for Further information pls. see TorrentBroadcast.scala
EDIT :
However you can have look at 2GB issue Even though that was officially not declared in docs (I was not able to see anything of this kind in docs). pls look at SPARK-6235 which is "IN PROGRESS" state & SPARK-6235_Design_V0.02.pdf .
spark.sql.autoBroadcastJoinThreshold
appears as though it is the threshold for an automatic broadcast. I'm asking about the maximum size for an explicit broadcast - similar but different. –
Bomar As of Spark 2.4, there's an upper limit of 8 GB. Source Code
Update : The 8GB limit is still valid for Spark 3.2.1 Source Code
Update: Still valid for Spark 3.4 Source code
broadcast
function :
Default is 10mb but we have used till 300 mb which is controlled by spark.sql.autoBroadcastJoinThreshold.
AFAIK, It all depends on memory available. so there is no definite answer for this. what I would say is, it should be less than large dataframe and you can estimate large or small dataframe size like below...
import org.apache.spark.util.SizeEstimator
logInfo(SizeEstimator.estimate(yourlargeorsmalldataframehere))
based on this you can pass broadcast
hint to framework.
Also have a look at scala doc from sql/execution/SparkStrategies.scala
which says....
- Broadcast: if one side of the join has an estimated physical size that is smaller than the user-configurable [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold or if that side has an explicit broadcast hint (e.g. the user applied the
[[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame), then that side of the join will be broadcasted and the other side will be streamed, with no shuffling
performed. If both sides are below the threshold, broadcast the smaller side. If neither is smaller, BHJ is not used.- Shuffle hash join: if the average size of a single partition is small enough to build a hash table.
- Sort merge: if the matching join keys are sortable.
- If there is no joining keys, Join implementations are chosen with the following precedence:
- BroadcastNestedLoopJoin: if one side of the join could be broadcasted
- CartesianProduct: for Inner join
- BroadcastNestedLoopJoin
Also have a look at other-configuration-options
SparkContext.broadcast
(TorrentBroadcast ) :
broadcast shared variable also has a property spark.broadcast.blockSize=4M
AFAIK there is no hard core limitation I have seen for this as well...
for Further information pls. see TorrentBroadcast.scala
EDIT :
However you can have look at 2GB issue Even though that was officially not declared in docs (I was not able to see anything of this kind in docs). pls look at SPARK-6235 which is "IN PROGRESS" state & SPARK-6235_Design_V0.02.pdf .
spark.sql.autoBroadcastJoinThreshold
appears as though it is the threshold for an automatic broadcast. I'm asking about the maximum size for an explicit broadcast - similar but different. –
Bomar Like mentioned above, the upper limit is 8GB. But when you have several files you want to broadcast, spark push all the data files to driver. The driver join those files & push to to executor nodes. In this process, if the driver's available memory is less than combined broadcast file, you will end up with out of memory error.
© 2022 - 2024 — McMap. All rights reserved.
rdd.cache()
to cacheRDD
in memory but there also they have not mentioned any limit for this in doc. "not sure it's wise to broadcast that much data"... I agree with you on this. – Lawless