In my Spark application I need to convert DataFrame
to .csv
file and put it to remote SFTP
server. I decided to use spark-sftp library for this task.
My sbt file looks like this:
import sbt.Keys.scalaVersion
name := "TEST"
version := "0.1"
scalaVersion := "2.11.12"
val sparkVersion = "2.3.2"
val ENVIRONMENT_MODE = "development"
mainClass in Compile := Some("MainApp")
mainClass in (Compile, packageBin) := Some("MainApp")
mainClass in assembly := Some("MainApp")
assemblyJarName in assembly := ENVIRONMENT_MODE + "_test" + ".jar"
// Spark Packages from "bintray.com"
resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven/"
// "Spark Project SQL"
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion
// "Spark Project Core"
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion
// Current library is a PostgreSQL database connection JDBC4 driver.
libraryDependencies += "postgresql" % "postgresql" % "9.1-901-1.jdbc4"
// "scala-xml" is a Scala library for working with XML files.
libraryDependencies += "org.scala-lang.modules" %% "scala-xml" % "1.1.1"
// "Apache Commons VFS" is a virtual file system library.
libraryDependencies += "org.apache.commons" % "commons-vfs2" % "2.2"
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"com.jcraft" % "jsch" % "0.1.54",
"com.springml" %% "spark-sftp" % "1.1.3"
)
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
// The mapping of path names to merge strategies is done via the setting "assemblyMergeStrategy".
assemblyMergeStrategy in assembly := {
case PathList("META-INF", _ @ _*) => MergeStrategy.discard
case _ => MergeStrategy.last
}
I compile sbt file without any error. When I try to test next code it raise error.
import spark.sqlContext.implicits._
val df: DataFrame = Seq(
("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
).toDF("NAME", "START_DATE", "END_DATE", "STATUS")
println("Count: " + df.count()) // Next command show in console: 5
df.write
.format("com.springml.spark.sftp")
.option("host", "XXXX")
.option("username", "XXXX")
.option("password", "XXXX")
.option("fileType", "csv")
.option("delimiter", ";")
.option("codec", "bzip2")
.save("/reports/daily.csv")
ERROR:
Exception in thread "main" java.lang.NoSuchMethodError: com.springml.sftp.client.SFTPClient.<init>(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;I)V
at com.springml.spark.sftp.DefaultSource.getSFTPClient(DefaultSource.scala:186)
at com.springml.spark.sftp.DefaultSource.createRelation(DefaultSource.scala:122)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
at report.CALL.runTask(CALL.scala:42)
at JobController.runJob(JobController.scala:38)
at MainApp$.main(MainApp.scala:74)
at MainApp.main(MainApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
What the reason of the problem? What other solutions would you recommend?
jar tvf development_test.jar
command return hug result. I notice such lines in that response:
0 Mon Jan 28 10:20:52 ALMT 2019 com/springml/
0 Mon Jan 28 10:20:52 ALMT 2019 com/springml/sftp/
0 Mon Jan 28 10:20:52 ALMT 2019 com/springml/sftp/client/
0 Mon Jan 28 10:20:52 ALMT 2019 com/springml/spark/
0 Mon Jan 28 10:20:52 ALMT 2019 com/springml/spark/sftp/
0 Mon Jan 28 10:20:52 ALMT 2019 com/springml/spark/sftp/util/
2430 Thu Jan 01 06:00:00 ALMT 1970 com/springml/sftp/client/CryptoUtils.class
829 Thu Jan 01 06:00:00 ALMT 1970 com/springml/sftp/client/FileNameFilter.class
1375 Thu Jan 01 06:00:00 ALMT 1970 com/springml/sftp/client/ProgressMonitor.class
10308 Thu Jan 01 06:00:00 ALMT 1970 com/springml/sftp/client/SFTPClient.class
3361 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DatasetRelation$.class
11896 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DatasetRelation.class
1241 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anon$1.class
1363 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$1.class
1168 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$10.class
1170 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$11.class
1168 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$12.class
1387 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$13.class
1363 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$14.class
1391 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$15.class
1193 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$16.class
1194 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$17.class
1293 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$18.class
1271 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$19.class
1339 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$2.class
1190 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$20.class
1192 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$21.class
1190 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$22.class
1190 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$23.class
1190 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$24.class
1190 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$25.class
1494 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$26.class
1520 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$27.class
1367 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$3.class
1169 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$4.class
1166 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$5.class
1169 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$6.class
1170 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$7.class
1269 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$8.class
1248 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource$$anonfun$9.class
19336 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DefaultSource.class
1758 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/DeleteTempFileShutdownHook.class
848 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/constants$.class
871 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/constants.class
1041 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/util/Utils$.class
1637 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/util/Utils$ImplicitDataFrameWriter.class
1366 Thu Jan 01 06:00:00 ALMT 1970 com/springml/spark/sftp/util/Utils.class