Read file on remote machine in Apache Spark using

2019-05-07 09:26发布

I am trying to read a file on an remote machine in Apache Spark (the Scala version) using ftp. Currently, I have followed an example in the Learning Spark repo of Databricks on GitHub. Using curl, I am able to download the file, so the path I uses exists.

Below is a snippet of the code I try to execute:

var file = sc.textFile("ftp://user:pwd/192.168.1.5/brecht-d-m/map/input.nt")
var fileDF = file.toDF()
fileDF.write.parquet("out")

After trying to execute a count on the dataframe, I get following stacktrace (http://pastebin.com/YEq8c2Hf):

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#1L])
+- TungstenExchange SinglePartition, None
   +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#4L])
      +- Project
         +- Scan ExistingRDD[_1#0]

...

Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: ftp://user:pwd@192.168.1.5/brecht-d-m/map/input.nt

I would assume that the file would be unreachable, but this is in contradiction with that I am able to retrieve the file via curl:

curl ftp://user:pwd@192.168.1.5/brecht-d-m/map/input.nt

This will print out the specific file on my terminal. I do not see what I am doing wrong in the Scala code. Is there an error in the code snippet I gave above, or is that code totally wrong?

Thanks in advance, Brecht

Note:

  • Specifying the whole path (/home/brecht-d-m/map/input.nt) also does not work (as expected, since this also does not work in curl; "server denied you to change to the given directory"). Trying this in Spark, gives the IOException that seek is not supported (http://pastebin.com/b9EB9ru2).

  • Working locally (e.g. sc.textFile("/home/brecht-d-m/map/input.nt")) works perfectly.

  • File permissions for specific file is set to R+W for all users.

  • The file size (15MB) should not be a problem, and it should be able to handle much bigger files.

  • Software versions: Scala 2.11.7, Apache Spark 1.6.0, Java 1.8.0_74, Ubuntu 14.04.4

1条回答
beautiful°
2楼-- · 2019-05-07 10:13

I was able to find a workaround. Via the codesnippet below:

import org.apache.spark.SparkFiles

val dataSource = "ftp://user:pwd/192.168.1.5/brecht-d-m/map/input.nt"
sc.addFile(dataSource)
var fileName = SparkFiles.get(dataSource.split("/").last)
var file = sc.textFile(fileName)

I am able to download a file over FTP (with the same URL as from the first code snippet). This workaround will first download the file (via addFile). Next, I retrieve the path to where the file was downloaded. Finally, I use that path to load that file into an RDD.

查看更多
登录 后发表回答