Optimizing partitioned data writes to S3 in spark

2019-07-08 19:24发布

问题:

I have around 700 GB of data which I am reading from HDFS in each Spark job run. My job reads this data, filters around 60% of the data, partitions it like:

val toBePublishedSignals = hiveCtx.sql("some query")

toBePublishedSignals.write.partitionBy("A", "B", "C").format(JSON_DATA_FORMAT)
      .mode(SaveMode.Append).save(getS3DataPath())

val metadataFiles = hiveCtx.sql("some query")
metadataFiles.distinct().write.partitionBy("A", "C").format(JSON_DATA_FORMAT)
  .mode(SaveMode.Append).save(getS3MetadataPath())

The job gets stuck on the driver. I took a dump of the driver and it is stuck at the following:

    at com.a9.trafficvalidation.hadoop.utils.fs.AWSS3FileSystem.retrieveObjectListing(AWSS3FileSystem.java:366)
    at com.a9.trafficvalidation.hadoop.utils.fs.AWSS3FileSystem.getFileStatus(AWSS3FileSystem.java:335)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:402)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:362)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:334)
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:222)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:144)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
    - locked <0x00000002d9b98288> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
    - locked <0x00000002d9b98330> (a org.apache.spark.sql.execution.QueryExecution)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:510)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)

Looks like S3 listing is a big bottleneck. The job gets stuck for hours and does not complete.

Or is there anyway where I can store the path like S3://bucket/A=dvfw/B=wfwef in the dataframe, repartition the dataframe by the path and then partitionBy only 'C' and write to the path? I don't know how I can do this without iterating through the entire dataframe and save the DF at one go.

Been on it since morning! Looking for some advice on how to handle this / avoid this!

TIA!

回答1:

As far as I remember, this situation happens when you write in append mode and you have a lot of partitions in the final location. Spark retrieves existing partitions and probably schemas. I would suggest two possible solutions.

1) If you don't have a lot of partitions to write per execution you can try the following:

// Prepare data and cache it
// There are a lot of data, so a part of it most probably will be written to disk
val toBePublishedSignals = hiveCtx.sql("some query").persist(StorageLevel.MEMORY_AND_DISK_SER_2)

// Get all unique combinations of partitions columns
val partitions = toBePublishedSignals.selectExpr("A", "B", "C").distinct().collect()

// Write each combination as a separate partition
partitions.foreach { p =>
    val a = p.getAs[String]("A"))
    val b = p.getAs[String]("B"))
    val c = p.getAs[String]("C"))
    val path = new Path(new Path(new Path(getS3DataPath(), s"A=$a"), s"B=$b"), s"C=$c")
    toBePublishedSignals.filter(col("A") === a && col("B") === b && col("C") === c)
                       .write.format(JSON_DATA_FORMAT).mode(SaveMode.Append).save(path.toUri.toString)
}

And the same for metadata.

// Prepare data and cache it
val metadataFiles = hiveCtx.sql("some query").distinct().persist(StorageLevel.MEMORY_AND_DISK_SER_2)

// Get all unique combinations of partitions columns
val partitions = metadataFiles.selectExpr("A", "C").distinct().collect()

// Write each combination as a separate partition
partitions.foreach { p =>
    val a = p.getAs[String]("A"))
    val c = p.getAs[String]("C"))
    val path = new Path(new Path(getS3MetadataPath(), s"A=$a"), s"C=$c")
    metadataFiles.filter(col("A") === a && col("C") === c)
                 .write.format(JSON_DATA_FORMAT).mode(SaveMode.Append).save(path.toUri.toString)
}

I don't know about data types of the partition columns, so in my example they are strings. The code above is only an example. It can be rewritten to a more generic way using fold operation and retrieving data types from DataFrame schema.

2) As an option, it is possible to read records from partitions you are going to touch in existing data and union with incoming records. Let's imagine that A/B/C are year/month/day correspondingly. We have some new data and df DataFrame is a result of processing of the data. After processing we the following data

2018|10|11|f1|f2|f3
2018|11|14|f1|f2|f3
2018|11|15|f1|f2|f3

It means that we need to read partitions from the location that contains final data (location which is returned by getS3DataPath())

year=2018/month=10/day=11
year=2018/month=11/day=14
year=2018/month=11/day=15

To do that we need to create a filter function which is a combination of several other functions. We use reduce for combining them using the following logic:

year=2018 && month=10 && day=11
or
year=2018 && month=11 && day=14
or
year=2018 && month=11 && day=15
// Do processing
val toBePublishedSignalsNew = hiveCtx.sql("some query")

// Create a filter function for querying existing data
val partitions = toBePublishedSignalsNew.selectExpr("A", "B", "C").distinct().collect()
val filterFunction = partitions.map { partitionValues =>
    partitionColumns.map { columnName =>
        (input: Row) => input.getAs[String](columnName) == partitionValues.getAs[String](columnName)
    }.reduceOption((f1, f2) => (row: Row) => f1(row) && f2(row)).getOrElse((_: Row) => false)
}.reduceOption((f1, f2) => (row: Row) => f1(row) || f2(row)).getOrElse((_: Row) => false)

// Read existing partitions that match incoming data
val toBePublishedSignalsExisting = sparkSession.read.json(getS3DataPath()).filter(filterFunction)

// Combine new and existing data and write the result to a temporary location
toBePublishedSignalsExisting
    .union(toBePublishedSignalsNew)
    .write
    .partitionBy("A", "B", "C")
    .format(JSON_DATA_FORMAT)
    .mode(SaveMode.Overwrite)
    .save(temporaryLocationS3)

After that you will need to replace partitions in the location which is returned by getS3DataPath() with ones located in the temporaryLocationS3. The example above will work only if the partition columns contain strings. If they have other data types you will probably have to add some mapping for filter functions. For example for IntegerType it will look like

(input: Row) => input.getAs[Int](columnName) == partitionValues.getAs[Int](columnName)