星火unionAll多个dataframes星火unionAll多个dataframes(Spark

2019-05-12 05:00发布

对于一组dataframes的

val df1 = sc.parallelize(1 to 4).map(i => (i,i*10)).toDF("id","x")
val df2 = sc.parallelize(1 to 4).map(i => (i,i*100)).toDF("id","y")
val df3 = sc.parallelize(1 to 4).map(i => (i,i*1000)).toDF("id","z")

工会所有的人我做

df1.unionAll(df2).unionAll(df3)

是否有这样做的任何数量的dataframes的更优雅和可扩展的方式,例如从

Seq(df1, df2, df3) 

Answer 1:

最简单的解决方案是reduceunionunionAll火花<2.0):

val dfs = Seq(df1, df2, df3)
dfs.reduce(_ union _)

这是相对简明的,不应从离堆存储移动数据,但是延伸谱系与每个工会需要进行计划分析非线性时间。 如果您尝试合并大量的东西可能是一个问题DataFrames

你也可以转换到RDDs和使用SparkContext.union

dfs match {
  case h :: Nil => Some(h)
  case h :: _   => Some(h.sqlContext.createDataFrame(
                     h.sqlContext.sparkContext.union(dfs.map(_.rdd)),
                     h.schema
                   ))
  case Nil  => None
}

它保持血统简短分析成本低,但否则它比合并效率较低DataFrames直接。



Answer 2:

对于pyspark你可以做到以下几点:

from functools import reduce
from pyspark.sql import DataFrame

dfs = [df1,df2,df3]
df = reduce(DataFrame.unionAll, dfs)

这也是一文不值,在dataframes列的顺序应该是相同的这个工作。 这可以默默地产生意外的结果,如果你没有正确的列订单!

如果你正在使用pyspark 2.3或更高版本,可以使用unionByName,所以你不必重新排序的列。



Answer 3:

发动机罩下的火花变平联合表达式。 因此,当联盟线性完成需要更长的时间。

最好的解决办法是火花,有一个支持多个DataFrames工会功能。

但是,下面的代码可能会加速多种DataFrames(或数据集)稍微的工会。

  def union[T : ClassTag](datasets : TraversableOnce[Dataset[T]]) : Dataset[T] = {
      binaryReduce[Dataset[T]](datasets, _.union(_))
  }
  def binaryReduce[T : ClassTag](ts : TraversableOnce[T], op: (T, T) => T) : T = {
      if (ts.isEmpty) {
         throw new IllegalArgumentException
      }
      var array = ts toArray
      var size = array.size
      while(size > 1) {
         val newSize = (size + 1) / 2
         for (i <- 0 until newSize) {
             val index = i*2
             val index2 = index + 1
             if (index2 >= size) {
                array(i) = array(index)  // last remaining
             } else {
                array(i) = op(array(index), array(index2))
             }
         }
         size = newSize
     }
     array(0)
 }


文章来源: Spark unionAll multiple dataframes