What is going wrong with `unionAll` of Spark `Data

2020-01-26 08:56发布

问题:

Using Spark 1.5.0 and given the following code, I expect unionAll to union DataFrames based on their column name. In the code, I'm using some FunSuite for passing in SparkContext sc:

object Entities {

  case class A (a: Int, b: Int)
  case class B (b: Int, a: Int)

  val as = Seq(
    A(1,3),
    A(2,4)
  )

  val bs = Seq(
    B(5,3),
    B(6,4)
  )
}

class UnsortedTestSuite extends SparkFunSuite {

  configuredUnitTest("The truth test.") { sc =>
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val aDF = sc.parallelize(Entities.as, 4).toDF
    val bDF = sc.parallelize(Entities.bs, 4).toDF
    aDF.show()
    bDF.show()
    aDF.unionAll(bDF).show
  }
}

Output:

+---+---+
|  a|  b|
+---+---+
|  1|  3|
|  2|  4|
+---+---+

+---+---+
|  b|  a|
+---+---+
|  5|  3|
|  6|  4|
+---+---+

+---+---+
|  a|  b|
+---+---+
|  1|  3|
|  2|  4|
|  5|  3|
|  6|  4|
+---+---+

Why does the result contain intermixed "b" and "a" columns, instead of aligning columns bases on column names? Sounds like a serious bug!?

回答1:

It doesn't look like a bug at all. What you see is a standard SQL behavior and every major RDMBS, including PostgreSQL, MySQL, Oracle and MS SQL behaves exactly the same. You'll find SQL Fiddle examples linked with names.

To quote PostgreSQL manual:

In order to calculate the union, intersection, or difference of two queries, the two queries must be "union compatible", which means that they return the same number of columns and the corresponding columns have compatible data types

Column names, excluding the first table in the set operation, are simply ignored.

This behavior comes directly form the Relational Algebra where basic building block is a tuple. Since tuples are ordered an union of two sets of tuples is equivalent (ignoring duplicates handling) to the output you get here.

If you want to match using names you can do something like this

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col

def unionByName(a: DataFrame, b: DataFrame): DataFrame = {
  val columns = a.columns.toSet.intersect(b.columns.toSet).map(col).toSeq
  a.select(columns: _*).unionAll(b.select(columns: _*))
}

To check both names and types it is should be enough to replace columns with:

a.dtypes.toSet.intersect(b.dtypes.toSet).map{case (c, _) => col(c)}.toSeq


回答2:

This issue is getting fixed in spark2.3. They are adding support of unionByName in the dataset.

https://issues.apache.org/jira/browse/SPARK-21043


回答3:

no issues/bugs - if you observe your case class B very closely then you will be clear. Case Class A --> you have mentioned the order (a,b), and Case Class B --> you have mentioned the order (b,a) ---> this is expected as per order

case class A (a: Int, b: Int) case class B (b: Int, a: Int)

thanks, Subbu



回答4:

Use unionByName:

Excerpt from the documentation:

def unionByName(other: Dataset[T]): Dataset[T]

The difference between this function and union is that this function resolves columns by name (not by position):

val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
df1.union(df2).show

// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// |   1|   2|   3|
// |   4|   5|   6|
// +----+----+----+


回答5:

As discussed in SPARK-9813, it seems like as long as the data types and number of columns are the same across frames, the unionAll operation should work. Please see the comments for additional discussion.