I have around 700 GB of data which I am reading from HDFS in each Spark job run. My job reads this data, filters around 60% of the data, partitions it like:
val toBePublishedSignals = hiveCtx.sql("some query")
toBePublishedSignals.write.partitionBy("A", "B", "C").format(JSON_DATA_FORMAT)
.mode(SaveMode.Append).save(getS3DataPath())
val metadataFiles = hiveCtx.sql("some query")
metadataFiles.distinct().write.partitionBy("A", "C").format(JSON_DATA_FORMAT)
.mode(SaveMode.Append).save(getS3MetadataPath())
The job gets stuck on the driver. I took a dump of the driver and it is stuck at the following:
at com.a9.trafficvalidation.hadoop.utils.fs.AWSS3FileSystem.retrieveObjectListing(AWSS3FileSystem.java:366)
at com.a9.trafficvalidation.hadoop.utils.fs.AWSS3FileSystem.getFileStatus(AWSS3FileSystem.java:335)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:402)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:362)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:334)
at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:222)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:144)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
- locked <0x00000002d9b98288> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
- locked <0x00000002d9b98330> (a org.apache.spark.sql.execution.QueryExecution)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:510)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
Looks like S3 listing is a big bottleneck. The job gets stuck for hours and does not complete.
Or is there anyway where I can store the path like S3://bucket/A=dvfw/B=wfwef in the dataframe, repartition the dataframe by the path and then partitionBy only 'C' and write to the path? I don't know how I can do this without iterating through the entire dataframe and save the DF at one go.
Been on it since morning! Looking for some advice on how to handle this / avoid this!
TIA!