Save content of Spark DataFrame as a single CSV fi

Say I have a Spark DataFrame which I want to save as CSV file. After Spark 2.0.0 , DataFrameWriter class directly supports saving it as a CSV file.

The default behavior is to save the output in multiple part-*.csv files inside the path provided.

How would I save a DF with :

  1. Path mapping to the exact file name instead of folder
  2. Header available in first line
  3. Save as a single file instead of multiple files.

One way to deal with it, is to coalesce the DF and then save the file.

df.coalesce(1).write.option("header", "true").csv("sample_file.csv")

However this has disadvantage in collecting it on Master machine and needs to have a master with enough memory.

Is it possible to write a single CSV file without using coalesce ? If not, is there a efficient way than the above code ?


Just solved this myself using pyspark with dbutils to get the .csv and rename to the wanted filename.

save_location= "s3a://landing-bucket-test/export/"+year
csv_location = save_location+"temp.folder'
file_location = save_location+'export.csv'

df.repartition(1).write.csv(path=csv_location, mode="append", header="true")

file =[-1].path
dbutils.fs.cp(file, file_location)
dbutils.fs.rm(csv_location, recurse=True)

This answer can be improved by not using [-1], but the .csv seems to always be last in the folder. Simple and fast solution if you only work on smaller files and can use repartition(1) or coalesce(1).


Use: df.toPandas().to_csv("sample_file.csv", header=True)

See documentation for details:


This solution is based on a Shell Script and is not parallelized, but is still very fast, especially on SSDs. It uses cat and output redirection on Unix systems. Suppose that the CSV directory containing partitions is located on /my/csv/dir and that the output file is /my/csv/output.csv:

echo "col1,col2,col3" > /my/csv/output.csv
for i in /my/csv/dir/*.csv ; do
    echo "Processing $i"
    cat $i >> /my/csv/output.csv
    rm $i
echo "Done"

It will remove each partition after appending it to the final CSV in order to free space.

"col1,col2,col3" is the CSV header (here we have three columns of name col1, col2 and col3). You must tell Spark to don't put the header in each partition (this is accomplished with .option("header", "false") because the Shell Script will do it.


For those still wanting to do this here's how I got it done using spark 2.1 in scala with some java.nio.file help.

Based on

    val df: org.apache.spark.sql.DataFrame = ??? // data frame to write
    val file: java.nio.file.Path = ??? // target output file (i.e. 'out.csv')

    import scala.collection.JavaConversions._

    // write csv into temp directory which contains the additional spark output files
    // could use Files.createTempDirectory instead
    val tempDir = file.getParent.resolve(file.getFileName + "_tmp")
        .option("header", "true")

    // find the actual csv file
    val tmpCsvFile = Files.walk(tempDir, 1).iterator().toSeq.find { p => 
        val fname = p.getFileName.toString
        fname.startsWith("part-00000") && fname.endsWith(".csv") && Files.isRegularFile(p)

    // move to desired final path
    Files.move(tmpCsvFile, file)

    // delete temp directory


The following scala method works in local or client mode, and writes the df to a single csv of the chosen name. It requires that the df fit into memory, otherwise collect() will blow up.

import org.apache.hadoop.fs.{FileSystem, Path}

val SPARK_WRITE_LOCATION = some_directory
val SPARKSESSION = org.apache.spark.sql.SparkSession

def saveResults(results : DataFrame, filename: String) {
    var fs = FileSystem.get(this.SPARKSESSION.sparkContext.hadoopConfiguration)
    if (SPARKSESSION.conf.get("spark.master").toString.contains("local")) {
      fs = FileSystem.getLocal(new conf.Configuration())
    val tempWritePath = new Path(SPARK_WRITE_LOCATION)
    if (fs.exists(tempWritePath)) {
      val x = fs.delete(new Path(SPARK_WRITE_LOCATION), true)
    if (results.count > 0) {
      val hadoopFilepath = new Path(SPARK_WRITE_LOCATION, filename)
      val writeStream = fs.create(hadoopFilepath, true)
      val bw = new BufferedWriter( new OutputStreamWriter( writeStream, "UTF-8" ) )
      val x = results.collect()
      for (row : Row <- x) {
        val rowString = row.mkString(start = "", sep = ",", end="\n")
      val resultsWritePath = new Path(WRITE_DIRECTORY, filename)
      if (fs.exists(resultsWritePath)) {
        fs.delete(resultsWritePath, true)
      fs.copyToLocalFile(false, hadoopFilepath, resultsWritePath, true)
    } else {


The FileUtil.copyMerge() from the Hadoop API should solve your problem.

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output

See Write single CSV file using spark-csv


This is how distributed computing work! Multiple files inside a directory is exactly how distributed computing works, this is not a problem at all since all software can handle it.

Your question should be "how is it possible to download a CSV composed of multiple files?" -> there are already lof of solutions in SO.

Another approach could be to use Spark as a JDBC source (with the awesome Spark Thrift server), write a SQL query and transform the result to CSV.

In order to prevent OOM in the driver (since the driver will get ALL the data), use incremental collect (spark.sql.thriftServer.incrementalCollect=true), more info at

Small recap about Spark "data partition" concept:


Between "stages", data can be transferred between partitions, this is the "shuffle". You want "Z" = 1, but with Y > 1, without shuffle? this is impossible.


df.coalesce(1).write.option("inferSchema","true").csv("/newFolder",header = 
'true',dateFormat = "yyyy-MM-dd HH:mm:ss")