How convert DataFrame to csv file and put it to re

2019-08-28 06:35发布

问题:

In my Spark application I need to convert DataFrame to .csv file and put it to remote SFTP server. I decided to use spark-sftp library for this task.

My sbt file looks like this:

import sbt.Keys.scalaVersion

name := "TEST"

version := "0.1"

scalaVersion := "2.11.12"

val sparkVersion = "2.3.2"

val ENVIRONMENT_MODE = "development"

mainClass in Compile := Some("MainApp")

mainClass in (Compile, packageBin) := Some("MainApp")

mainClass in assembly := Some("MainApp")

assemblyJarName in assembly := ENVIRONMENT_MODE + "_test" + ".jar"

// Spark Packages from "bintray.com"
resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven/"

// "Spark Project SQL"
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion

// "Spark Project Core"
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion

// Current library is a PostgreSQL database connection JDBC4 driver.
libraryDependencies += "postgresql" % "postgresql" % "9.1-901-1.jdbc4"

// "scala-xml" is a Scala library for working with XML files.
libraryDependencies += "org.scala-lang.modules" %% "scala-xml" % "1.1.1"

// "Apache Commons VFS" is a virtual file system library.
libraryDependencies += "org.apache.commons" % "commons-vfs2" % "2.2"

libraryDependencies ++= Seq(
  "org.scalatest"    %% "scalatest"  % "3.0.5" % "test",
  "com.jcraft"        % "jsch"       % "0.1.54",
  "com.springml"     %% "spark-sftp" % "1.1.3"
)

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)


// The mapping of path names to merge strategies is done via the setting "assemblyMergeStrategy".
assemblyMergeStrategy in assembly := {
  case PathList("META-INF", _ @ _*) => MergeStrategy.discard
  case _ => MergeStrategy.last
}

I compile sbt file without any error. When I try to test next code it raise error.

import spark.sqlContext.implicits._

val df: DataFrame  = Seq(
  ("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
  ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
  ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
  ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
  ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
).toDF("NAME", "START_DATE", "END_DATE", "STATUS")

println("Count: " + df.count()) // Next command show in console: 5

df.write
  .format("com.springml.spark.sftp")
  .option("host", "XXXX")
  .option("username", "XXXX")
  .option("password", "XXXX")
  .option("fileType", "csv")
  .option("delimiter", ";")
  .option("codec", "bzip2")
  .save("/reports/daily.csv")

ERROR:

Exception in thread "main" java.lang.NoSuchMethodError: com.springml.sftp.client.SFTPClient.<init>(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;I)V
        at com.springml.spark.sftp.DefaultSource.getSFTPClient(DefaultSource.scala:186)
        at com.springml.spark.sftp.DefaultSource.createRelation(DefaultSource.scala:122)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
        at report.CALL.runTask(CALL.scala:42)
        at JobController.runJob(JobController.scala:38)
        at MainApp$.main(MainApp.scala:74)
        at MainApp.main(MainApp.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

What the reason of the problem? What other solutions would you recommend?


jar tvf development_test.jar command return hug result. I notice such lines in that response:

 0 Mon Jan 28 10:20:52 ALMT 2019 com/springml/
 0 Mon Jan 28 10:20:52 ALMT 2019 com/springml/sftp/
 0 Mon Jan 28 10:20:52 ALMT 2019 com/springml/sftp/client/
 0 Mon Jan 28 10:20:52 ALMT 2019 com/springml/spark/
 0 Mon Jan 28 10:20:52 ALMT 2019 com/springml/spark/sftp/
 0 Mon Jan 28 10:20:52 ALMT 2019 com/springml/spark/sftp/util/
2430 Thu Jan 01 06:00:00 ALMT 1970 com/springml/sftp/client/CryptoUtils.class
   829 Thu Jan 01 06:00:00 ALMT 1970 com/springml/sftp/client/FileNameFilter.class
  1375 Thu Jan 01 06:00:00 ALMT 1970 com/springml/sftp/client/ProgressMonitor.class
 10308 Thu Jan 01 06:00:00 ALMT 1970 com/springml/sftp/client/SFTPClient.class
  3361 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DatasetRelation$.class
 11896 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DatasetRelation.class
  1241 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anon$1.class
  1363 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$1.class
  1168 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$10.class
  1170 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$11.class
  1168 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$12.class
  1387 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$13.class
  1363 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$14.class
  1391 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$15.class
  1193 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$16.class
  1194 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$17.class
  1293 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$18.class
  1271 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$19.class
  1339 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$2.class
  1190 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$20.class
  1192 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$21.class
  1190 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$22.class
  1190 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$23.class
  1190 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$24.class
  1190 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$25.class
  1494 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$26.class
  1520 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$27.class
  1367 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$3.class
  1169 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$4.class
  1166 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$5.class
  1169 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$6.class
  1170 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$7.class
  1269 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$8.class
  1248 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$9.class
 19336 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource.class
  1758 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DeleteTempFileShutdownHook.class
   848 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/constants$.class
   871 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/constants.class
  1041 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/util/Utils$.class
  1637 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/util/Utils$ImplicitDataFrameWriter.class
  1366 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/util/Utils.class

回答1:

I was able to get this work basically using your code, a few changes to the sbt file: Github Repo with full code sample

name := "test-sftp-upload"

version := "0.0.1"

scalaVersion := "2.11.12"

resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven"

libraryDependencies ++= Seq(
        "org.apache.spark" %% "spark-sql"  % "2.3.2",
        "org.apache.spark" %% "spark-core" % "2.3.2",
        "com.jcraft"        % "jsch"       % "0.1.55",
        "org.scalatest"    %% "scalatest"  % "3.0.5" % "test",
        "com.springml"     %% "spark-sftp" % "1.1.3"
        )


// JAR file settings
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

Nearly Identical app file:

package org.twelveHart.example.sftp

import org.apache.spark.sql.DataFrame

object sftpTest extends SparkSessionWrapper {
  def main(args: Array[String]): Unit = {

    import spark.implicits._
    spark.sparkContext.setLogLevel("ERROR")

    val df: DataFrame = Seq(
      ("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
      ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
      ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
      ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
      ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
    ).toDF("NAME", "START_DATE", "END_DATE", "STATUS")

    df.printSchema()

    df.write
      .format("com.springml.spark.sftp")
      .option("host", "localhost")
      .option("username", "XXXXXX")
      .option("password", "XXXXXXX")
      .option("fileType", "csv")
      .option("delimiter", ";")
      .option("codec", "bzip2")
      .save("/tmp/daily.csv")

    spark.stop()
  }
}


回答2:

I notice that spark-sftp library (1.1.3) has several dependencies. On of them sftp-client (1.0.3) library. spark-sftp library use some method's of sftp-client library which was duplicated. Here is my code which works.

def runJob(): Unit ={
    try {
      val spark: SparkSession = initializeSpark()
      import spark.sqlContext.implicits._

      // Create DataFrame.
      val df: DataFrame  = Seq(("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"), ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"), ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"), ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"), ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")).toDF("NAME", "START_DATE", "END_DATE", "STATUS")
      df.show()

      // Create the object based on class "SFTPClient".
      val sftpClient = new SFTPClient(null, "username", "password", "host", 22)

      val tmpFolder = System.getProperty("java.io.tmpdir")
      val hdfsTemp = tmpFolder

      val source = writeToTemp(spark, df, hdfsTemp, tmpFolder, "csv", "true", ";", "rowTag", "rootTag")

      println("source: " + source)

      // Copy file to FTP server.
      sftpClient.copyToFTP(source, "/reports/example.csv")
    } catch {
      case e: Exception => e.printStackTrace()
    }
  }

  def writeToTemp(sparkSession: SparkSession, df: DataFrame, hdfsTemp: String, tempFolder: String, fileType: String, header: String, delimiter: String, rowTag: String, rootTag: String) : String = {

    val randomSuffix = "spark_sftp_connection_temp_" + UUID.randomUUID
    val hdfsTempLocation = hdfsTemp + File.separator + randomSuffix
    val localTempLocation = tempFolder + File.separator + randomSuffix

    println("hdfsTempLocation: " + hdfsTempLocation)
    println("localTempLocation: " + localTempLocation)

    addShutdownHook(localTempLocation)

    df.coalesce(1).write.option("header", header).option("delimiter", delimiter).csv(hdfsTempLocation)

    copyFromHdfs(sparkSession, hdfsTempLocation, localTempLocation)

    println(copyFromHdfs(sparkSession, hdfsTempLocation, localTempLocation))

    copiedFile(localTempLocation)
  }

  def addShutdownHook(tempLocation: String) {
    println("Adding hook for file " + tempLocation)
    val hook = new DeleteTempFileShutdownHook(tempLocation)
    Runtime.getRuntime.addShutdownHook(hook)
  }

  def copyFromHdfs(sparkSession: SparkSession, hdfsTemp : String, fileLocation : String): String  = {
    val hadoopConf = sparkSession.sparkContext.hadoopConfiguration
    val hdfsPath = new Path(hdfsTemp)
    val fs = hdfsPath.getFileSystem(hadoopConf)
    if ("hdfs".equalsIgnoreCase(fs.getScheme)) {
      fs.copyToLocalFile(new Path(hdfsTemp), new Path(fileLocation))
      fs.deleteOnExit(new Path(hdfsTemp))
      fileLocation
    } else {
      hdfsTemp
    }
  }

  def copiedFile(tempFileLocation: String) : String = {
    val baseTemp = new File(tempFileLocation)
    val files = baseTemp.listFiles().filter { x =>
      !x.isDirectory && !x.getName.contains("SUCCESS") && !x.isHidden && !x.getName.contains(".crc")
    }
    files(0).getAbsolutePath
  }

I removed information about codec option cause there was problems with charset in final csv file.