Short answer
- Values are sent the first time they are needed in an executor. Nothing is sent when
sc.broadcast(variable)
is called.
- The data is sent only to the nodes that contain an executor that needs it.
- The data is stored in memory. If not enough memory is available, the disk is used.
- Yes, there is a big difference between accessing a local variable and a broadcast variable. Broadcast variables have to be downloaded the first time they are accessed.
Long answer
The answer is in Spark's source, in TorrentBroadcast.scala
.
When sc.broadcast
is called, a new TorrentBroadcast
object is instantiated from BroadcastFactory.scala
. The following happens in writeBlocks()
, which is called when the TorrentBroadcast object is initialized:
- The object is cached unserialized locally using the
MEMORY_AND_DISK
policy.
- It is serialized.
- The serialized version is split into 4Mb blocks, that are compressed[0], and saved locally[1].
When new executors are created, they only have the lightweight TorrentBroadcast
object, that only contains the broadcast object's identifier, and its number of blocks.
The TorrentBroadcast
object has a lazy[2] property that contains its value. When the value
method is called, this lazy property is returned. So the first time this value function is called on a task, the following happens:
- In a random order, blocks are fetched from the local block manager and uncompressed.
- If they are not present in the local block manager,
getRemoteBytes
is called on the block manager to fetch them. Network traffic happens only at that time.
- If the block wasn't present locally, it is cached using
MEMORY_AND_DISK_SER
.
[0] Compressed with lz4 by default. This can be tuned.
[1] The blocks are stored in the local block manager, using MEMORY_AND_DISK_SER
, which means that it spills partitions that don't fit in memory to disk. Each block has an unique identifier, computed from the identifier of the broadcast variable, and its offset. The size of blocks can be configured; it is 4Mb by default.
[2] A lazy val in scala is a variable whose value is evaluated the first time it is accessed, and then cached. See the documentation.