How to filter Spark dataframe by array column cont

2019-02-19 12:02发布

问题:

I have a Dataframe A that contains a column of array string.

...
 |-- browse: array (nullable = true)
 |    |-- element: string (containsNull = true)
...

For example three sample rows would be

+---------+--------+---------+
| column 1|  browse| column n|
+---------+--------+---------+
|     foo1| [X,Y,Z]|     bar1|
|     foo2|   [K,L]|     bar2|
|     foo3|     [M]|     bar3|

And another Dataframe B that contains a column of string

|-- browsenodeid: string (nullable = true)

Some sample rows for it would be

+------------+
|browsenodeid|
+------------+
|           A|
|           Z|
|           M|

How can I filter A so that I keep all the rows whose browse contains any of the the values of browsenodeid from B? In terms of the above examples the result will be:

+---------+--=-----+---------+
| column 1|  browse| column n|
+---------+--------+---------+
|     foo1| [X,Y,Z]|     bar1| <- because Z is a value of B.browsenodeid
|     foo3|     [M]|     bar3| <- because M is a value of B.browsenodeid

If I had a single value then I would use something like

A.filter(array_contains(A("browse"), single_value))

But what do I do with a list or DataFrame of values?

回答1:

I found an elegant solution for this, without the need to cast DataFrames/Datasets to RDDs.

Assuming you have a DataFrame dataDF:

+---------+--------+---------+
| column 1|  browse| column n|
+---------+--------+---------+
|     foo1| [X,Y,Z]|     bar1|
|     foo2|   [K,L]|     bar2|
|     foo3|     [M]|     bar3|

and an array b containing the values you want to match in browse

val b: Array[String] = Array(M,Z)

Implement the udf:

def array_contains_any(s: Seq[String]): UserDefinedFunction = udf((c: WrappedArray[String]) => c.toList.intersect(s).nonEmpty)

and then simply use the filter or where function (with a little bit of fancy currying :P) to do the filtering like:

dataDF.where(array_contains_any(b)($"browse"))


回答2:

Assume input data:Dataframe A

browse
200,300,889,767,9908,7768,9090
300,400,223,4456,3214,6675,333
234,567,890
123,445,667,887

and you have to match it with Dataframe B

browsenodeid:(I flatten the column browsenodeid) 123,200,300

val matchSet = "123,200,300".split(",").toSet
val rawrdd = sc.textFile("D:\\Dataframe_A.txt")
rawrdd.map(_.split("|"))
      .map(arr => arr(0).split(",").toSet.intersect(matchSet).mkString(","))
      .foreach(println)

Your output:

300,200
300
123

Updated

val matchSet = "A,Z,M".split(",").toSet

val rawrdd = sc.textFile("/FileStore/tables/mvv45x9f1494518792828/input_A.txt")

rawrdd.map(_.split("|"))
      .map(r => if (! r(1).split(",").toSet.intersect(matchSet).isEmpty) org.apache.spark.sql.Row(r(0),r(1), r(2))).collect.foreach(println)

Output is

foo1,X,Y,Z,bar1
foo3,M,bar3