Read multiple files from a directory using Spark

2020-03-31 08:02发布

问题:

I am trying to solve this problem at kaggle using spark:

the hierarchy of input is like this :

drivers/{driver_id}/trip#.csv
e.g., drivers/1/1.csv
      drivers/1/2.csv
      drivers/2/1.csv

I want to read the parent directory "drivers" and for each sub directory i would like to create a pairRDD with key as (sub_directory,file_name) and value as the content of the file

I checked this link and tried to use

val text = sc.wholeTextFiles("drivers")
text.collect()

this failed with error :

java.lang.ArrayIndexOutOfBoundsException: 0
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.<init>(CombineFileInputFormat.java:591)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:283)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:243)
    at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:267)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1779)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:885)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:884)

but when i run the below code, it works.

val text =  sc.wholeTextFiles("drivers/1")
text.collect()

but I don't want to do this, since here i will have to read the directory drivers and loop the files and call wholeTextFiles for each entry.

回答1:

Instead of using

sc.textfile("path/*/**") or sc.wholeTextFiles("path/*")

You can use this piece of code. Because spark internally lists all the possible values of a folder and subfolder so it can cost you time on large datasets. Instead of that you can use Unions for the same purpose.

Pass this List object which contains the locations to the following piece of code, note : sc is an object of SQLContext

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

Now you got a final Unified RDD i.e. df