How to use Column.isin with array column in join?

2020-02-05 10:28发布

问题:

case class Foo1(codes:Seq[String], name:String)
case class Foo2(code:String, description:String)

val ds1 = Seq(
  Foo1(Seq("A"),           "foo1"),
  Foo1(Seq("A", "B"),      "foo2"),
  Foo1(Seq("B", "C", "D"), "foo3"),
  Foo1(Seq("C"),           "foo4"),
  Foo1(Seq("C", "D"),      "foo5")
).toDS

val ds2 = Seq(
  Foo2("A", "product A"),
  Foo2("B", "product B"),
  Foo2("C", "product C"),
  Foo2("D", "product D"),
  Foo2("E", "product E")
).toDS

val j = ds1.join(ds2, ds2("code") isin (ds1("codes")))

Hopefully this Scala code fragment makes it clear what I'm trying to accomplish, our data is structured so that one data set has a column which contains an array of values, and I wish to join the values within that collection to another data set. So for example Seq("A", "B") in ds1 would join with "A" and "B" in ds2.

The "isin" operator on Column seems to be exactly what I want, and this builds and runs, but when I run it I get the following error:

org.apache.spark.sql.AnalysisException: cannot resolve '(code IN (codes))' due to data type mismatch: Arguments must be same type;;

Reading further I see that isin() wants to take a varargs ("splatted args") and seems more suitable for a filter(). So my question is, is this the intended use of this operator, or is there some other way to perform this type of join?

回答1:

Please use array_contains:

ds1.crossJoin(ds2).where("array_contains(codes, code)").show

+---------+----+----+-----------+
|    codes|name|code|description|
+---------+----+----+-----------+
|      [A]|foo1|   A|  product A|
|   [A, B]|foo2|   A|  product A|
|   [A, B]|foo2|   B|  product B|
|[B, C, D]|foo3|   B|  product B|
|[B, C, D]|foo3|   C|  product C|
|[B, C, D]|foo3|   D|  product D|
|      [C]|foo4|   C|  product C|
|   [C, D]|foo5|   C|  product C|
|   [C, D]|foo5|   D|  product D|
+---------+----+----+-----------+

If you use Spark 1.x or 2.0 replace crossJoin with standard join, and enable cross joins in configuration, if necessary.

It might by possible to avoid Cartesian product with explode:

ds1.withColumn("code", explode($"codes")).join(ds2, Seq("code")).show
+----+---------+----+-----------+                                               
|code|    codes|name|description|
+----+---------+----+-----------+
|   B|   [A, B]|foo2|  product B|
|   B|[B, C, D]|foo3|  product B|
|   D|[B, C, D]|foo3|  product D|
|   D|   [C, D]|foo5|  product D|
|   C|[B, C, D]|foo3|  product C|
|   C|      [C]|foo4|  product C|
|   C|   [C, D]|foo5|  product C|
|   A|      [A]|foo1|  product A|
|   A|   [A, B]|foo2|  product A|
+----+---------+----+-----------+