This is a very simple question: in spark, broadcast
can be used to send variables to executors efficiently. How does this work ?
More precisely:
- when are values sent : as soon as I call
broadcast
, or when the values are used ? - Where exactly is the data sent : to all executors, or only to the ones that will need it ?
- where is the data stored ? In memory, or on disk ?
- Is there a difference in how simple variables and broadcast variables are accessed ? What happens under the hood when I call the
.value
method ?
Short answer
sc.broadcast(variable)
is called.Long answer
The answer is in Spark's source, in
TorrentBroadcast.scala
.When
sc.broadcast
is called, a newTorrentBroadcast
object is instantiated fromBroadcastFactory.scala
. The following happens inwriteBlocks()
, which is called when the TorrentBroadcast object is initialized:MEMORY_AND_DISK
policy.When new executors are created, they only have the lightweight
TorrentBroadcast
object, that only contains the broadcast object's identifier, and its number of blocks.The
TorrentBroadcast
object has a lazy[2] property that contains its value. When thevalue
method is called, this lazy property is returned. So the first time this value function is called on a task, the following happens:getRemoteBytes
is called on the block manager to fetch them. Network traffic happens only at that time.MEMORY_AND_DISK_SER
.[0] Compressed with lz4 by default. This can be tuned.
[1] The blocks are stored in the local block manager, using
MEMORY_AND_DISK_SER
, which means that it spills partitions that don't fit in memory to disk. Each block has an unique identifier, computed from the identifier of the broadcast variable, and its offset. The size of blocks can be configured; it is 4Mb by default.[2] A lazy val in scala is a variable whose value is evaluated the first time it is accessed, and then cached. See the documentation.
it: