I'm trying to understand spark shuffle process deeply. When i start reading i came across the following point.
Spark writes the Map task(ShuffleMapTask) output directly to disk on completion.
I would like to understand the following w.r.t to Hadoop MapReduce.
If both Map-Reduce and Spark writes the data to the local disk then how spark shuffle process is different from Hadoop MapReduce?
Since data is represented as RDD's in Spark why don't these outputs remain in the node executors memory?
How is the output of the Map tasks from Hadoop MapReduce and Spark different?
If there are lot of small intermediate files as output how spark handles the network and I/O bottleneck?
First of all Spark doesn't work in a strict map-reduce manner and map
output is not written to disk unless it is necessary. To disk are written shuffle files.
It doesn't mean that data after the shuffle is not kept in memory. Shuffle files in Spark are written mostly to avoid re-computation in case of multiple downstream actions. Why to write to a file system at all? There at least two interleaved reasons:
- memory is a valuable resource and in-memory caching in Spark is ephemeral. Old data can be evicted from cache when needed.
- shuffle is an expensive process we want to avoid if not necessary. It makes more sense to store shuffle data in a manner which makes it persistent during a lifetime of a given context.
Shuffle itself, apart from the ongoing low level optimization efforts and implementation details, isn't different at all. It is based on the same basic approach with all its limitations.
How tasks are different form Hadoo maps? As nicely illustrated by Justin Pihony multiple transformations which doesn't require shuffles are squashed together in a single tasks. Since these operate on standard Scala Iterators operations on individual elements can be piped.
Regarding network and I/O bottlenecks there is no silver bullet here. While Spark can reduce amount of data which is written to disk or shuffled by combining transformations, caching in memory and providing transformation aware worker preferences, it is a subject to the same limitations like any other distributed framework.
If both Map-Reduce and Spark writes the data to the local disk then how spark shuffle process is different from Hadoop MapReduce?
When you execute a Spark application, the very first thing is starting the SparkContext
first that becomes the home of multiple interconnected services with DAGScheduler
, TaskScheduler
and SchedulerBackend
being among the most important ones.
DAGScheduler
is the main orchestrator and is responsible for transforming a RDD lineage graph (i.e. a directed acyclic graph of RDDs) into stages. While doing it, DAGScheduler
traverses the parent dependencies of the final RDD and creates a ResultStage
with parent ShuffleMapStages
.
A ResultStage
is (mostly) the last stage with ShuffleMapStages
being its parents. I said mostly because I think I may have seen that you can "schedule" a ShuffleMapStage
.
This is the very early and first optimization Spark applies to your Spark jobs (that together create a Spark application) - execution pipelining where multiple transformations are wired together to create a single stage (because their inter-dependencies are narrow). That's what makes Spark faster than Hadoop MapReduce since two or more transformations can get executed one by one with no data shuffling possibly all in memory.
A single stage is as wide until it hits ShuffleDependency
(aka wide dependency).
There are RDD transformations that will cause shuffling (due to creating a ShuffleDependency
). That's the moment where Spark is very much like Hadoop's MapReduce since it will save partial shuffle outputs to...local disks on executors.
When a Spark application starts it requests executors from a cluster manager (there are three supported: Spark Standalone, Apache Mesos and Hadoop YARN). This is what SchedulerBackend
is for -- to manage communication between your Spark application and cluster resources.
(Let's assume you are not using External Shuffle Manager)
Executors host their own local BlockManager
s that are responsible for managing RDD blocks that are kept on local hard drive (possibly in memory and replicated too). You can control RDD block persistence using cache
and persist
operators and StorageLevels. You can use Storage
and Executors
tabs in web UI to track blocks with their location and size.
The difference between Spark storing data locally (on executors) and Hadoop MapReduce is that:
The partial results (after computing ShuffleMapStages
) are saved on local hard drives not HDFS which is a distributed file system with a very expensive saves.
Only some files are saved to local hard drive (after operations being pipelined) which does not happen in Hadoop MapReduce that saves all maps to HDFS.
Let me answer the following item:
If there are lot of small intermediate files as output how spark handles the network and I/O bottleneck?
That's the trickest part in the Spark execution plan and heavily depends on how wide the shuffling is. If you work only with local data (multiple executors on a single machine) you will see no data traffic since the data is in place already.
If the data shuffle is required, executors will send data between each other and that will increase the traffic.
Data Exchange Between Nodes in Spark Application
Just to elaborate on the traffic between nodes in a Spark application.
Broadcast variables are the means of sending data from the driver to executors.
Accumulators are the means of sending data from executors to the driver.
Operators like collect will pull all the remote blocks from executors to the driver.
p.s. Yeah, figures and diagrams would have helped...working on some. Upvote to get me inspired :)