In spark, how does broadcast work?
Asked Answered
L

2

15

This is a very simple question: in spark, broadcast can be used to send variables to executors efficiently. How does this work ?

More precisely:

  • when are values sent : as soon as I call broadcast, or when the values are used ?
  • Where exactly is the data sent : to all executors, or only to the ones that will need it ?
  • where is the data stored ? In memory, or on disk ?
  • Is there a difference in how simple variables and broadcast variables are accessed ? What happens under the hood when I call the .value method ?
Larue answered 18/11, 2016 at 20:30 Comment(0)
L
22

Short answer

  • Values are sent the first time they are needed in an executor. Nothing is sent when sc.broadcast(variable) is called.
  • The data is sent only to the nodes that contain an executor that needs it.
  • The data is stored in memory. If not enough memory is available, the disk is used.
  • Yes, there is a big difference between accessing a local variable and a broadcast variable. Broadcast variables have to be downloaded the first time they are accessed.

Long answer

The answer is in Spark's source, in TorrentBroadcast.scala.

  1. When sc.broadcast is called, a new TorrentBroadcast object is instantiated from BroadcastFactory.scala. The following happens in writeBlocks(), which is called when the TorrentBroadcast object is initialized:

    1. The object is cached unserialized locally using the MEMORY_AND_DISK policy.
    2. It is serialized.
    3. The serialized version is split into 4Mb blocks, that are compressed[0], and saved locally[1].
  2. When new executors are created, they only have the lightweight TorrentBroadcast object, that only contains the broadcast object's identifier, and its number of blocks.

  3. The TorrentBroadcast object has a lazy[2] property that contains its value. When the value method is called, this lazy property is returned. So the first time this value function is called on a task, the following happens:

    1. In a random order, blocks are fetched from the local block manager and uncompressed.
    2. If they are not present in the local block manager, getRemoteBytes is called on the block manager to fetch them. Network traffic happens only at that time.
    3. If the block wasn't present locally, it is cached using MEMORY_AND_DISK_SER.

[0] Compressed with lz4 by default. This can be tuned.

[1] The blocks are stored in the local block manager, using MEMORY_AND_DISK_SER, which means that it spills partitions that don't fit in memory to disk. Each block has an unique identifier, computed from the identifier of the broadcast variable, and its offset. The size of blocks can be configured; it is 4Mb by default.

[2] A lazy val in scala is a variable whose value is evaluated the first time it is accessed, and then cached. See the documentation.

Larue answered 19/11, 2016 at 15:53 Comment(14)
If the broadcast variable is sent to each individual cluster node(that needs it) then what's the difference when we don't make it a broadcast variable. how will that variable be handled otherwise?Mingrelian
Otherwise, the variable would be sent to each executor independently, as part of the task, without using a torrent mechanism. If the variable is large, then this would make the task larger, and delay it's execution. And not using the torrent protocol means that the download would be slower.Larue
Still, didn't get it. What's the difference in sending to each executor independently(in case of broadcast) and sending to each cluster node(normal way)?Mingrelian
In one case, each executor downloads data from all the others, in the the other case, each node downloads the data exclusively from the master.Larue
BTW, executors and nodes are interchangeable terms. Right? So, you mean in case of broadcast a single copy of the variable is present at each node, whereas otherwise, multiple copies of the same variable is present at each node(because a given node takes the variable data from each other worker node). Is that rightly understood?Mingrelian
No, executors and nodes are not the same thing. A single node can run multiple executors.Larue
And no, in no case is the variable duplicated. In a broadcast, the variable is distributed using a torent-like protocol, whereas without broadcast, the variable is just distributed together with the task. You should read en.wikipedia.org/wiki/Peer-to-peer_file_sharingLarue
As much as I could get, you meant that in case of a non-broadcast variable, it is distributed to each executor at the time when when the task is distributed. This takes time because we only have just 1 master node that is distributing this variable to each executor(takes time if file is large). Whereas in case of broadcast, once the master distributes it, the receiving executors also act as distributors and receiver(peers in torrent). This makes it fast for the variable to reach each executor in the cluster. Is that rightly understood?Mingrelian
Again, as you said that when distributing, 1 node contains many executors so is the variable is distributed to each node or to each executor? Similarly, when distributing each task, is the group of tasks distributed to each node and then the node distributes it to all the executors lying inside it or is it something else? what's the process of distribution like? Can you put some link?Mingrelian
github.com/apache/spark/blob/master/core/src/main/scala/org/…Larue
At least first tell, whether my first comment is right or not. I'm in dilemma.Mingrelian
> in case of a non-broadcast variable, it is distributed to each executor at the time when when the task is distributed [...] . Whereas in case of broadcast, once the master distributes it, the receiving executors also act as distributors [...] Is that rightly understood? YesLarue
Fine. This way it looks like using the broadcast way of distributing variables is always faster. Why would I use the non-broadcast way to distribute anything ever?Mingrelian
You will be distributing the serialized tasks in any case. Making a task a few bytes heavier is worth it if it means the variable will be immediately available when the task starts. But when the value of a variable is very large, it becomes worth it to start the task without the value, and then fetch the value over bittorrent.Larue
F
2
  • as soon as it is broadcasted
  • it is send to all executors using torrent protocol but loaded only when needed
  • once loaded variables are stored deserialized in memory
  • it:

    • validates that broadcast hasn't been destroyed
    • lazily loads variable from blockManager
Faro answered 19/11, 2016 at 6:49 Comment(1)
I found the source, and am currently having a hard time understanding it. In TorrentBroadcast.scala, it seems that it tells the blockmanager to write bytes only locally when the Broadcast is created, and that network transfer occurs only when value is called. (value reads _value, which is a lazy val). That would mean the exact contrary of what you stated: nothing is sent when the variable is created, and the value is sent only to the executors that need it.Larue

© 2022 - 2024 — McMap. All rights reserved.