I have a directory structure on S3 looking like this:
foo
|-base
|-2017
|-01
|-04
|-part1.orc, part2.orc ....
|-A
|-2017
|-01
|-04
|-part1.orc, part2.orc ....
|-B
|-2017
|-01
|-04
|-part1.orc, part2.orc ....
Meaning that for directory foo
I have multiple output tables, base
, A
, B
, etc in a given path based on the timestamp of a job.
I'd like to left join
them all, based on a timestamp and the master directory, in this case foo
. This would mean reading in each output table base
, A
, B
, etc into new separate input tables on which a left join
can be applied. All with the base
table as starting point
Something like this (not working code!)
val dfs: Seq[DataFrame] = spark.read.orc("foo/*/2017/01/04/*")
val base: DataFrame = spark.read.orc("foo/base/2017/01/04/*")
val result = dfs.foldLeft(base)((l, r) => l.join(r, 'id, "left"))
Can someone point me in the right direction on how to get that sequence of DataFrames? It might even be worth considering the reads as lazy, or sequential, thus only reading the A
or B
table when the join is applied to reduce memory requirements.
Note: the directory structure is not final, meaning it can change if that fits the solution.
From what I understand Spark uses the underlying Hadoop API to read in data file. So the inherited behavior is to read everything you specify into one single RDD/DataFrame.
To achieve what you want, you can first get a list of directories with:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{ FileSystem, Path }
val path = "foo/"
val hadoopConf = new Configuration()
val fs = FileSystem.get(hadoopConf)
val paths: Array[String] = fs.listStatus(new Path(path)).
filter(_.isDirectory).
map(_.getPath.toString)
Then load them into separated dataframes:
val dfs: Array[DataFrame] = paths.
map(path => spark.read.orc(path + "/2017/01/04/*"))
Here's a straight-forward solution to what (I think) you're trying to do, with no use of extra features like Hive or build-in partitioning abilities:
import spark.implicits._
// load base
val baseDF = spark.read.orc("foo/base/2017/01/04").as("base")
// create or use existing Hadoop FileSystem - this should use the actual config and path
val fs = FileSystem.get(new URI("."), new Configuration())
// find all other subfolders under foo/
val otherFolderPaths = fs.listStatus(new Path("foo/"), new PathFilter {
override def accept(path: Path): Boolean = path.getName != "base"
}).map(_.getPath)
// use foldLeft to join all, using the DF aliases to find the right "id" column
val result = otherFolderPaths.foldLeft(baseDF) { (df, path) =>
df.join(spark.read.orc(s"$path/2017/01/04").as(path.getName), $"base.id" === $"${path.getName}.id" , "left") }