Change output filename prefix for DataFrame.write(

2020-01-29 01:20发布

问题:

Output files generated via the Spark SQL DataFrame.write() method begin with the "part" basename prefix. e.g.

DataFrame sample_07 = hiveContext.table("sample_07");
sample_07.write().parquet("sample_07_parquet");

Results in:

hdfs dfs -ls sample_07_parquet/                                                                                                                                                             
Found 4 items
-rw-r--r--   1 rob rob          0 2016-03-19 16:40 sample_07_parquet/_SUCCESS
-rw-r--r--   1 rob rob        491 2016-03-19 16:40 sample_07_parquet/_common_metadata
-rw-r--r--   1 rob rob       1025 2016-03-19 16:40 sample_07_parquet/_metadata
-rw-r--r--   1 rob rob      17194 2016-03-19 16:40 sample_07_parquet/part-r-00000-cefb2ac6-9f44-4ce4-93d9-8e7de3f2cb92.gz.parquet

I would like to change the output filename prefix used when creating a file using Spark SQL DataFrame.write(). I tried setting the "mapreduce.output.basename" property on the hadoop configuration for the Spark context. e.g.

public class MyJavaSparkSQL {

  public static void main(String[] args) throws Exception {
    SparkConf sparkConf = new SparkConf().setAppName("MyJavaSparkSQL");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    ctx.hadoopConfiguration().set("mapreduce.output.basename", "myprefix");
    HiveContext hiveContext = new org.apache.spark.sql.hive.HiveContext(ctx.sc());
    DataFrame sample_07 = hiveContext.table("sample_07");
    sample_07.write().parquet("sample_07_parquet");
    ctx.stop();
  }

That did not change the output filename prefix for the generated files.

Is there a way to override the output filename prefix when using the DataFrame.write() method?

回答1:

You cannot change the "part" prefix while using any of the standard output formats (like Parquet). See this snippet from ParquetRelation source code:

private val recordWriter: RecordWriter[Void, InternalRow] = {
  val outputFormat = {
    new ParquetOutputFormat[InternalRow]() {
      // ...
      override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
        // ..
        //  prefix is hard-coded here:
        new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
    }
  }
}

If you really must control the part file names, you'll probably have to implement a custom FileOutputFormat and use one of Spark's save methods that accept a FileOutputFormat class (e.g. saveAsHadoopFile).



回答2:

Assuming that the output folder have only one csv file in it, we can rename this grammatically (or dynamically) using the below code. In the below code (last line), get all files from the output directory with csv type and rename that to a desired file name.

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
val outputfolder_Path = "s3://<s3_AccessKey>:<s3_Securitykey>@<external_bucket>/<path>"     
val fs = FileSystem.get(new java.net.URI(outputfolder_Path), new Configuration())   
fs.globStatus(new Path(outputfolder_Path + "/*.*")).filter(_.getPath.toString.split("/").last.split("\\.").last == "csv").foreach{l=>{ fs.rename(new Path(l.getPath.toString), new Path(outputfolder_Path + "/DesiredFilename.csv")) }}