Save content of Spark DataFrame as a single CSV fi

2020-02-07 19:33发布

问题:

Say I have a Spark DataFrame which I want to save as CSV file. After Spark 2.0.0 , DataFrameWriter class directly supports saving it as a CSV file.

The default behavior is to save the output in multiple part-*.csv files inside the path provided.

How would I save a DF with :

  1. Path mapping to the exact file name instead of folder
  2. Header available in first line
  3. Save as a single file instead of multiple files.

One way to deal with it, is to coalesce the DF and then save the file.

df.coalesce(1).write.option("header", "true").csv("sample_file.csv")

However this has disadvantage in collecting it on Master machine and needs to have a master with enough memory.

Is it possible to write a single CSV file without using coalesce ? If not, is there a efficient way than the above code ?

回答1:

Just solved this myself using pyspark with dbutils to get the .csv and rename to the wanted filename.

save_location= "s3a://landing-bucket-test/export/"+year
csv_location = save_location+"temp.folder'
file_location = save_location+'export.csv'

df.repartition(1).write.csv(path=csv_location, mode="append", header="true")

file = dbutils.fs.ls(csv_location)[-1].path
dbutils.fs.cp(file, file_location)
dbutils.fs.rm(csv_location, recurse=True)

This answer can be improved by not using [-1], but the .csv seems to always be last in the folder. Simple and fast solution if you only work on smaller files and can use repartition(1) or coalesce(1).



回答2:

Use: df.toPandas().to_csv("sample_file.csv", header=True)

See documentation for details: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframe#pyspark.sql.DataFrame.toPandas



回答3:

This solution is based on a Shell Script and is not parallelized, but is still very fast, especially on SSDs. It uses cat and output redirection on Unix systems. Suppose that the CSV directory containing partitions is located on /my/csv/dir and that the output file is /my/csv/output.csv:

#!/bin/bash
echo "col1,col2,col3" > /my/csv/output.csv
for i in /my/csv/dir/*.csv ; do
    echo "Processing $i"
    cat $i >> /my/csv/output.csv
    rm $i
done
echo "Done"

It will remove each partition after appending it to the final CSV in order to free space.

"col1,col2,col3" is the CSV header (here we have three columns of name col1, col2 and col3). You must tell Spark to don't put the header in each partition (this is accomplished with .option("header", "false") because the Shell Script will do it.



回答4:

For those still wanting to do this here's how I got it done using spark 2.1 in scala with some java.nio.file help.

Based on https://fullstackml.com/how-to-export-data-frame-from-apache-spark-3215274ee9d6

    val df: org.apache.spark.sql.DataFrame = ??? // data frame to write
    val file: java.nio.file.Path = ??? // target output file (i.e. 'out.csv')

    import scala.collection.JavaConversions._

    // write csv into temp directory which contains the additional spark output files
    // could use Files.createTempDirectory instead
    val tempDir = file.getParent.resolve(file.getFileName + "_tmp")
    df.coalesce(1)
        .write.format("com.databricks.spark.csv")
        .option("header", "true")
        .save(tempDir.toAbsolutePath.toString)

    // find the actual csv file
    val tmpCsvFile = Files.walk(tempDir, 1).iterator().toSeq.find { p => 
        val fname = p.getFileName.toString
        fname.startsWith("part-00000") && fname.endsWith(".csv") && Files.isRegularFile(p)
    }.get

    // move to desired final path
    Files.move(tmpCsvFile, file)

    // delete temp directory
    Files.walk(tempDir)
        .sorted(java.util.Comparator.reverseOrder())
        .iterator().toSeq
        .foreach(Files.delete(_))


回答5:

The following scala method works in local or client mode, and writes the df to a single csv of the chosen name. It requires that the df fit into memory, otherwise collect() will blow up.

import org.apache.hadoop.fs.{FileSystem, Path}

val SPARK_WRITE_LOCATION = some_directory
val SPARKSESSION = org.apache.spark.sql.SparkSession

def saveResults(results : DataFrame, filename: String) {
    var fs = FileSystem.get(this.SPARKSESSION.sparkContext.hadoopConfiguration)
    
    if (SPARKSESSION.conf.get("spark.master").toString.contains("local")) {
      fs = FileSystem.getLocal(new conf.Configuration())
    }
    
    val tempWritePath = new Path(SPARK_WRITE_LOCATION)
    
    if (fs.exists(tempWritePath)) {
    
      val x = fs.delete(new Path(SPARK_WRITE_LOCATION), true)
      assert(x)
    }
    
    if (results.count > 0) {
      val hadoopFilepath = new Path(SPARK_WRITE_LOCATION, filename)
      val writeStream = fs.create(hadoopFilepath, true)
      val bw = new BufferedWriter( new OutputStreamWriter( writeStream, "UTF-8" ) )
    
      val x = results.collect()
      for (row : Row <- x) {
        val rowString = row.mkString(start = "", sep = ",", end="\n")
        bw.write(rowString)
      }
    
      bw.close()
      writeStream.close()
    
      val resultsWritePath = new Path(WRITE_DIRECTORY, filename)
    
      if (fs.exists(resultsWritePath)) {
        fs.delete(resultsWritePath, true)
      }
      fs.copyToLocalFile(false, hadoopFilepath, resultsWritePath, true)
    } else {
      System.exit(-1)
    }
}



回答6:

The FileUtil.copyMerge() from the Hadoop API should solve your problem.

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}

See Write single CSV file using spark-csv



回答7:

This is how distributed computing work! Multiple files inside a directory is exactly how distributed computing works, this is not a problem at all since all software can handle it.

Your question should be "how is it possible to download a CSV composed of multiple files?" -> there are already lof of solutions in SO.

Another approach could be to use Spark as a JDBC source (with the awesome Spark Thrift server), write a SQL query and transform the result to CSV.

In order to prevent OOM in the driver (since the driver will get ALL the data), use incremental collect (spark.sql.thriftServer.incrementalCollect=true), more info at http://www.russellspitzer.com/2017/05/19/Spark-Sql-Thriftserver/.


Small recap about Spark "data partition" concept:

INPUT (X PARTITIONs) -> COMPUTING (Y PARTITIONs) -> OUTPUT (Z PARTITIONs)

Between "stages", data can be transferred between partitions, this is the "shuffle". You want "Z" = 1, but with Y > 1, without shuffle? this is impossible.



回答8:

df.coalesce(1).write.option("inferSchema","true").csv("/newFolder",header = 
'true',dateFormat = "yyyy-MM-dd HH:mm:ss")