Spark: Writing RDD Results to File System is Slow

2019-06-11 14:13发布

问题:

I'm developing a Spark application with Scala. My application consists of only one operation that requires shuffling (namely cogroup). It runs flawlessly and at a reasonable time. The issue I'm facing is when I want to write the results back to the file system; for some reason, it takes longer than running the actual program. At first, I tried writing the results without re-partitioning or coalescing, and I realized that the number of generated files are huge, so I thought that was the issue. I tried re-partitioning (and coalescing) before writing, but the application took a long time performing these tasks. I know that re-partitioning (and coalescing) is costly, but is what I'm doing the right way? If it's not, could you please give me hints on what's the right approach.

Notes:

  • My file system is Amazon S3.
  • My input data size is around 130GB.
  • My cluster contains a driver node and five slave nodes each has 16 cores and 64 GB of RAM.
  • I'm assigning 15 executors for my job, each has 5 cores and 19GB of RAM.

P.S. I tried using Dataframes, same issue.

Here is a sample of my code just in case:

val sc = spark.sparkContext

// loading the samples
val samplesRDD = sc
  .textFile(s3InputPath)
  .filter(_.split(",").length > 7)
  .map(parseLine)
  .filter(_._1.nonEmpty) // skips any un-parsable lines


// pick random samples 
val samples1Ids = samplesRDD
  .map(_._2._1) // map to id
  .distinct
  .takeSample(withReplacement = false, 100, 0)

// broadcast it to the cluster's nodes
val samples1IdsBC = sc broadcast samples1Ids

val samples1RDD = samplesRDD
  .filter(samples1IdsBC.value contains _._2._1)

val samples2RDD = samplesRDD
  .filter(sample => !samples1IdsBC.value.contains(sample._2._1))

// compute
samples1RDD
  .cogroup(samples2RDD)
  .flatMapValues { case (left, right) =>
    left.map(sample1 => (sample1._1, right.filter(sample2 => isInRange(sample1._2, sample2._2)).map(_._1)))
  }
  .map {
    case (timestamp, (sample1Id, sample2Ids)) =>
      s"$timestamp,$sample1Id,${sample2Ids.mkString(";")}"
  }

  .repartition(10)
  .saveAsTextFile(s3OutputPath)

UPDATE

Here is the same code using Dataframes:

// loading the samples
val samplesDF = spark
  .read
  .csv(inputPath)
  .drop("_c1", "_c5", "_c6", "_c7", "_c8")
  .toDF("id", "timestamp", "x", "y")
  .withColumn("x", ($"x" / 100.0f).cast(sql.types.FloatType))
  .withColumn("y", ($"y" / 100.0f).cast(sql.types.FloatType))

// pick random ids as samples 1
val samples1Ids = samplesDF
  .select($"id") // map to the id
  .distinct
  .rdd
  .takeSample(withReplacement = false, 1000)
  .map(r => r.getAs[String]("id"))

// broadcast it to the executor
val samples1IdsBC = sc broadcast samples1Ids

// get samples 1 and 2
val samples1DF = samplesDF
  .where($"id" isin (samples1IdsBC.value: _*))

val samples2DF = samplesDF
  .where(!($"id" isin (samples1IdsBC.value: _*)))

samples2DF
  .withColumn("combined", struct("id", "lng", "lat"))
  .groupBy("timestamp")
  .agg(collect_list("combined").as("combined_list"))
  .join(samples1DF, Seq("timestamp"), "rightouter")
  .map {
    case Row(timestamp: String, samples: mutable.WrappedArray[GenericRowWithSchema], sample1Id: String, sample1X: Float, sample1Y: Float) =>
      val sample2Info = samples.filter {
        case Row(_, sample2X: Float, sample2Y: Float) =>
          Misc.isInRange((sample2X, sample2Y), (sample1X, sample1Y), 20)
        case _ => false
      }.map {
        case Row(sample2Id: String, sample2X: Float, sample2Y: Float) =>
          s"$sample2Id:$sample2X:$sample2Y"
        case _ => ""
      }.mkString(";")

      (timestamp, sample1Id, sample1X, sample1Y, sample2Info)
    case Row(timestamp: String, _, sample1Id: String, sample1X: Float, sample1Y: Float) => // no overlapping samples
      (timestamp, sample1Id, sample1X, sample1Y, "")
    case _ =>
      ("error", "", 0.0f, 0.0f, "")
  }
  .where($"_1" notEqual "error")
  //      .show(1000, truncate = false)
  .write
  .csv(outputPath)

回答1:

Issue here is that normally spark commit tasks, jobs by renaming files, and on S3 renames are really, really slow. The more data you write, the longer it takes at the end of the job. That what you are seeing.

Fix: switch to the S3A committers, which don't do any renames.

Some tuning options to massively increase the number of threads in IO, commits and connection pool size fs.s3a.threads.max from 10 to something bigger fs.s3a.committer.threads -number files committed by a POST in parallel; default is 8 fs.s3a.connection.maximum + try (fs.s3a.committer.threads + fs.s3a.threads.max + 10)

These are all fairly small as many jobs work with multiple buckets and if there were big numbers for each it'd be really expensive to create an s3a client...but if you have many thousands of files, probably worthwhile.