对于一组dataframes的
val df1 = sc.parallelize(1 to 4).map(i => (i,i*10)).toDF("id","x")
val df2 = sc.parallelize(1 to 4).map(i => (i,i*100)).toDF("id","y")
val df3 = sc.parallelize(1 to 4).map(i => (i,i*1000)).toDF("id","z")
工会所有的人我做
df1.unionAll(df2).unionAll(df3)
是否有这样做的任何数量的dataframes的更优雅和可扩展的方式,例如从
Seq(df1, df2, df3)
最简单的解决方案是reduce
与union
( unionAll
火花<2.0):
val dfs = Seq(df1, df2, df3)
dfs.reduce(_ union _)
这是相对简明的,不应从离堆存储移动数据,但是延伸谱系与每个工会需要进行计划分析非线性时间。 如果您尝试合并大量的东西可能是一个问题DataFrames
。
你也可以转换到RDDs
和使用SparkContext.union
:
dfs match {
case h :: Nil => Some(h)
case h :: _ => Some(h.sqlContext.createDataFrame(
h.sqlContext.sparkContext.union(dfs.map(_.rdd)),
h.schema
))
case Nil => None
}
它保持血统简短分析成本低,但否则它比合并效率较低DataFrames
直接。
对于pyspark你可以做到以下几点:
from functools import reduce
from pyspark.sql import DataFrame
dfs = [df1,df2,df3]
df = reduce(DataFrame.unionAll, dfs)
这也是一文不值,在dataframes列的顺序应该是相同的这个工作。 这可以默默地产生意外的结果,如果你没有正确的列订单!
如果你正在使用pyspark 2.3或更高版本,可以使用unionByName,所以你不必重新排序的列。
发动机罩下的火花变平联合表达式。 因此,当联盟线性完成需要更长的时间。
最好的解决办法是火花,有一个支持多个DataFrames工会功能。
但是,下面的代码可能会加速多种DataFrames(或数据集)稍微的工会。
def union[T : ClassTag](datasets : TraversableOnce[Dataset[T]]) : Dataset[T] = {
binaryReduce[Dataset[T]](datasets, _.union(_))
}
def binaryReduce[T : ClassTag](ts : TraversableOnce[T], op: (T, T) => T) : T = {
if (ts.isEmpty) {
throw new IllegalArgumentException
}
var array = ts toArray
var size = array.size
while(size > 1) {
val newSize = (size + 1) / 2
for (i <- 0 until newSize) {
val index = i*2
val index2 = index + 1
if (index2 >= size) {
array(i) = array(index) // last remaining
} else {
array(i) = op(array(index), array(index2))
}
}
size = newSize
}
array(0)
}