Executing hdfs commands from inside scala script

2019-08-13 02:30发布

问题:

I'm trying to execute a HDFS specific command from inside the scala script being executed by Spark in cluster mode. Below the command:

val cmd = Seq("hdfs","dfs","-copyToLocal","/tmp/file.dat","/path/to/local")
val result = cmd.!!

The job fails at this stage with the error as below:

java.io.FileNotFoundException: /var/run/cloudera-scm-agent/process/2087791-yarn-NODEMANAGER/log4j.properties (Permission denied)
        at java.io.FileInputStream.open0(Native Method)
        at java.io.FileInputStream.open(FileInputStream.java:195)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at java.io.FileInputStream.<init>(FileInputStream.java:93)
        at sun.net.www.protocol.file.FileURLConnection.connect(FileURLConnection.java:90)
        at sun.net.www.protocol.file.FileURLConnection.getInputStream(FileURLConnection.java:188)
        at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:557)
        at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
        at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
        at org.apache.log4j.Logger.getLogger(Logger.java:104)
        at org.apache.commons.logging.impl.Log4JLogger.getLogger(Log4JLogger.java:262)
        at org.apache.commons.logging.impl.Log4JLogger.<init>(Log4JLogger.java:108)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

However, when I run the same command separately in Spark shell, it executes just fine and the file is copied as well.

scala> val cmd = Seq("hdfs","dfs","-copyToLocal","/tmp/file_landing_area/file.dat","/tmp/local_file_area")
cmd: Seq[String] = List(hdfs, dfs, -copyToLocal, /tmp/file_landing_area/file.dat, /tmp/local_file_area)

scala> val result = cmd.!!
result: String = ""

I don't understand the permission denied error. Although it displays as a FileNotFoundException. Totally confusing.

Any ideas?

回答1:

As per error, it is checking hdfs data into var folder which I suspect configuration issue or it is not pointing to correct one. Using seq and executing HDFS command is not good practise. It is useful only for spark shell. Using same approach in code not advisable. Instead of this try to use below Scala File system API to move data From or To HDFS. Please check below sample code just for reference that might help you.

import org.apache.hadoop.fs
import org.apache.hadoop.fs._
val conf = new Configuration()

val fs = path.getFileSystem(conf)

val hdfspath = new Path("hdfs:///user/nikhil/test.csv")
val localpath = new Path("file:///home/cloudera/test/")

fs.copyToLocalFile(hdfspath,localpath)

Please use below link for more reference regarding Scala File system API.

https://hadoop.apache.org/docs/r2.9.0/api/org/apache/hadoop/fs/FileSystem.html#copyFromLocalFile(boolean,%20boolean,%20org.apache.hadoop.fs.Path,%20org.apache.hadoop.fs.Path)