Spark read multiple directories into multiple data

2019-04-13 07:26发布

问题:

I have a directory structure on S3 looking like this:

foo
  |-base
     |-2017
        |-01
           |-04
              |-part1.orc, part2.orc ....
  |-A
     |-2017
        |-01
           |-04
              |-part1.orc, part2.orc ....
  |-B
     |-2017
        |-01
           |-04
              |-part1.orc, part2.orc ....

Meaning that for directory foo I have multiple output tables, base, A, B, etc in a given path based on the timestamp of a job.

I'd like to left join them all, based on a timestamp and the master directory, in this case foo. This would mean reading in each output table base, A, B, etc into new separate input tables on which a left join can be applied. All with the base table as starting point

Something like this (not working code!)

val dfs: Seq[DataFrame] = spark.read.orc("foo/*/2017/01/04/*")
val base: DataFrame = spark.read.orc("foo/base/2017/01/04/*")

val result = dfs.foldLeft(base)((l, r) => l.join(r, 'id, "left"))

Can someone point me in the right direction on how to get that sequence of DataFrames? It might even be worth considering the reads as lazy, or sequential, thus only reading the A or B table when the join is applied to reduce memory requirements.

Note: the directory structure is not final, meaning it can change if that fits the solution.

回答1:

From what I understand Spark uses the underlying Hadoop API to read in data file. So the inherited behavior is to read everything you specify into one single RDD/DataFrame.

To achieve what you want, you can first get a list of directories with:

    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{ FileSystem, Path }

    val path = "foo/"

    val hadoopConf = new Configuration()
    val fs = FileSystem.get(hadoopConf)
    val paths: Array[String] = fs.listStatus(new Path(path)).
      filter(_.isDirectory).
      map(_.getPath.toString)

Then load them into separated dataframes:

    val dfs: Array[DataFrame] = paths.
      map(path => spark.read.orc(path + "/2017/01/04/*"))


回答2:

Here's a straight-forward solution to what (I think) you're trying to do, with no use of extra features like Hive or build-in partitioning abilities:

import spark.implicits._

// load base
val baseDF = spark.read.orc("foo/base/2017/01/04").as("base")

// create or use existing Hadoop FileSystem - this should use the actual config and path
val fs = FileSystem.get(new URI("."), new Configuration())

// find all other subfolders under foo/
val otherFolderPaths = fs.listStatus(new Path("foo/"), new PathFilter {
  override def accept(path: Path): Boolean = path.getName != "base"
}).map(_.getPath)

// use foldLeft to join all, using the DF aliases to find the right "id" column
val result = otherFolderPaths.foldLeft(baseDF) { (df, path) =>
  df.join(spark.read.orc(s"$path/2017/01/04").as(path.getName), $"base.id" === $"${path.getName}.id" , "left") }