-->

Shuffled vs non-shuffled coalesce in Apache Spark

2020-07-13 07:59发布

问题:

What is the difference between the following transformations when they are executed right before writing RDD to a file?

  1. coalesce(1, shuffle = true)
  2. coalesce(1, shuffle = false)

Code example:

val input = sc.textFile(inputFile)
val filtered = input.filter(doSomeFiltering)
val mapped = filtered.map(doSomeMapping)

mapped.coalesce(1, shuffle = true).saveAsTextFile(outputFile)
vs
mapped.coalesce(1, shuffle = false).saveAsTextFile(outputFile)

And how does it compare with collect()? I'm fully aware that Spark save methods will store it with HDFS-style structure, however I'm more interested in data partitioning aspects of collect() and shuffled/non-shuffled coalesce().

回答1:

shuffle=true and shuffle=false aren't going to have any practical differences in the resulting output since they are both going down to a single partition. However, when you set it to true you will do a shuffle which isn't of any use. With shuffle=true the output is evenly distributed amongst the partitions (and your also able to increase the # of partitions if you wanted), but since your target is 1 partition, everything is ending up in one partition regardless.

As for comparision with collect(), the difference is all of the data is stored on a single executor rather than on the driver.



回答2:

By looking at the coalesce documentation for Spark 2.3.1, https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/rdd/RDD.html#coalesce-int-boolean-scala.Option-scala.math.Ordering-

Looks more convenient to add shuffle=true when you are reducing the number of partitions to 1 to avoid computation taking place on fewer nodes than you like. This will add a shuffle step, but means the current upstream partitions will be executed in parallel.



回答3:

coalesce(n, shuffle = true) which is also equivalent to repartition(n) may have, depending on what mapping or any other processing login you have in your parent RDDs, considerable effect on how your job performs.

In general, when data in your parent partitions are evenly distributed and you are not drastically decreasing number of partitions, you should avoid using shuffle when using coalesce.

However, in your case this is a substantial reduction in the number of partitions and as per the documentation

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is)

Given that, now you need to properly assess and choose between

  • Shuffling potentially huge amounts of data but doing computations in parent partitions in parallel
  • Collecting all the partitions into one without full reshuffling (there will still of course be data movements) but doing computations within a single task

For example, consider the following snippets which are far from the actual logic you may have but will give you a perspective on what is happening

// fast
sc.parallelize(0 to 1000000, 10)
  .mapPartitions(it => {Thread.sleep(5000); it.map(_.toString)})
  .coalesce(1, shuffle = true)
  .toDF.write.text("shuffleTrue")
// slow
sc.parallelize(0 to 1000000, 10)
  .mapPartitions(it => {Thread.sleep(5000); it.map(_.toString)})
  .coalesce(1, shuffle = false)
  .toDF.write.text("shuffleFalse")

On my cluster that with shuffle = true showed total time of roughly 5 seconds with 10 tasks, performing computation logic on each parent partition in parallel. And the other with shuffle = false had roughly 50 seconds doing all the computation within a single task on one executor.