How to enumerate files in HDFS directory

2019-02-26 07:19发布

问题:

How do I enumerate files in HDFS directory? This is for enumerating files in Apache Spark cluster using Scala. I see there is sc.textfile() option but that will read the contents as-well. I want to read only file names.

I actually tried the listStatus. But didn't work. Get the below error. I am using Azure HDInsight Spark and the blob store folder "testContainer@testhdi.blob.core.windows.net/example/" contains .json files.

val fs = FileSystem.get(new Configuration())
val status = fs.listStatus(new Path("wasb://testContainer@testhdi.blob.core.windows.net/example/"))
status.foreach(x=> println(x.getPath)

=========
Error:
========
java.io.FileNotFoundException: Filewasb://testContainer@testhdi.blob.core.windows.net/example does not exist.
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem.listStatus(NativeAzureFileSystem.java:2076)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:23)
    at $iwC$$iwC$$iwC.<init>(<console>:28)
    at $iwC$$iwC.<init>(<console>:30)
    at $iwC.<init>(<console>:32)
    at <init>(<console>:34)
    at .<init>(<console>:38)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at com.cloudera.livy.repl.scalaRepl.SparkInterpreter$$anonfun$executeLine$1.apply(SparkInterpreter.scala:272)
    at com.cloudera.livy.repl.scalaRepl.SparkInterpreter$$anonfun$executeLine$1.apply(SparkInterpreter.scala:272)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at scala.Console$.withOut(Console.scala:126)
    at com.cloudera.livy.repl.scalaRepl.SparkInterpreter.executeLine(SparkInterpreter.scala:271)
    at com.cloudera.livy.repl.scalaRepl.SparkInterpreter.executeLines(SparkInterpreter.scala:246)
    at com.cloudera.livy.repl.scalaRepl.SparkInterpreter.execute(SparkInterpreter.scala:104)
    at com.cloudera.livy.repl.Session.com$cloudera$livy$repl$Session$$executeCode(Session.scala:98)
    at com.cloudera.livy.repl.Session$$anonfun$3.apply(Session.scala:73)
    at com.cloudera.livy.repl.Session$$anonfun$3.apply(Session.scala:73)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Thanks!

回答1:

The reason this is failing is because it is actually looking in your the default storage container rather than the testContainer, and thus not finding the example folder. You can see this by changing the path to wasb://testContainer@testhdi.blob.core.windows.net/ and it will list files from a different container.

I don't know why this is, but I discovered you can fix it by passing the path to the FileSystem.get call like this:

val fs = FileSystem.get(new java.net.URI("wasb://testContainer@testhdi.blob.core.windows.net/example/"), new Configuration())
val status = fs.listStatus(new Path("wasb://testContainer@testhdi.blob.core.windows.net/example/"))
status.foreach(x=> println(x.getPath)


回答2:

see FileSystem class

abstract FileStatus[] listStatus(Path f)

List the statuses of the files/directories in the given path if the path is a directory.

val fs = FileSystem.get(new Configuration())
val status = fs.listStatus(new Path(HDFS_PATH))
status.foreach(x=> println(x.getPath)

Note : HDFS api you can access from any language like java or scala below is java example as well

/**
     * Method listFileStats.
     * 
     * @param destination
     * @param fs
     * @throws FileNotFoundException
     * @throws IOException
     */
    public static void listFileStats(final String destination, final FileSystem fs) throws FileNotFoundException, IOException {
        final FileStatus[] statuss = fs.listStatus(new Path(destination));
        for (final FileStatus status : statuss) {
            LOG.info("--  status {}    ", status.toString());
            LOG.info("Human readable size {} of file ", FileUtils.byteCountToDisplaySize(status.getLen())); //import org.apache.commons.io.FileUtils;
        }
    }
}