Spark: Difference between Shuffle Write, Shuffle s

2019-03-24 03:08发布

问题:

I have the following spark job, trying to keep everything in memory:

val myOutRDD = myInRDD.flatMap { fp =>
  val tuple2List: ListBuffer[(String, myClass)] = ListBuffer()
        :

  tuple2List
}.persist(StorageLevel.MEMORY_ONLY).reduceByKey { (p1, p2) =>
   myMergeFunction(p1,p2)
}.persist(StorageLevel.MEMORY_ONLY)

However, when I looked in to the job tracker, I still have a lot of Shuffle Write and Shuffle spill to disk ...

Total task time across all tasks: 49.1 h
Input Size / Records: 21.6 GB / 102123058
Shuffle write: 532.9 GB / 182440290
Shuffle spill (memory): 370.7 GB
Shuffle spill (disk): 15.4 GB

Then the job failed because "no space left on device" ... I am wondering for the 532.9 GB Shuffle write here, is it written to disk or memory?

Also, why there are still 15.4 G data spill to the disk while I specifically ask to keep them in the memory?

Thanks!

回答1:

Shuffle spill (memory) is the size of the deserialized form of the data in memory at the time when we spill it, whereas shuffle spill (disk) is the size of the serialized form of the data on disk after we spill it. This is why the latter tends to be much smaller than the former. Note that both metrics are aggregated over the entire duration of the task (i.e. within each task you can spill multiple times).



回答2:

The persist calls in your code are entirely wasted if you don't access the RDD multiple times. What's the point of storing something if you never access it? Caching has no bearing on shuffle behavior other than you can avoid re-doing shuffles by keeping their output cached.

Shuffle spill is controlled by the spark.shuffle.spill and spark.shuffle.memoryFraction configuration parameters. If spill is enabled (it is by default) then shuffle files will spill to disk if they start using more than given by memoryFraction (20% by default).

The metrics are very confusing. My reading of the code is that "Shuffle spill (memory)" is the amount of memory that was freed up as things were spilled to disk. The code for "Shuffle spill (disk)" looks like it's the amount actually written to disk. By the code for "Shuffle write" I think it's the amount written to disk directly — not as a spill from a sorter.



回答3:

One more note on how to prevent shuffle spill, since I think that is the most important part of the question from a performance aspect (shuffle write, as mentioned above, is a required part of shuffling).

Spilling occurs when the at shuffle read, any reducer cannot fit all of the records assigned to it in memory in the shuffle space on that executor. If your shuffle is unbalanced (e.g. some output partitions are much larger than some input partitions), you may have shuffle spill even if the partitions "fit in memory" before the shuffle. The best way to control this is by A) balancing the shuffle... e.g changing your code to reduce before shuffling or by shuffling on different keys or B) changing the shuffle memory settings as suggested above Given the extent of the spill to disk you probably need to do A rather than B.



回答4:

shuffle data

Shuffle write means those data which have written to your local file system in temporary cache location. In yarn cluster mode, you may set this property with attribute "yarn.nodemanager.local-dirs" in yarn-site.xml. Therefor, the "shuffle write" means the size of data which you've written to the temporary location; "Shuffle spill" is more likely your shuffle stage result. Anyway, those figure are accumulated.