I have a Dataframe A that contains a column of array string.
...
|-- browse: array (nullable = true)
| |-- element: string (containsNull = true)
...
For example three sample rows would be
+---------+--------+---------+
| column 1| browse| column n|
+---------+--------+---------+
| foo1| [X,Y,Z]| bar1|
| foo2| [K,L]| bar2|
| foo3| [M]| bar3|
And another Dataframe B that contains a column of string
|-- browsenodeid: string (nullable = true)
Some sample rows for it would be
+------------+
|browsenodeid|
+------------+
| A|
| Z|
| M|
How can I filter A so that I keep all the rows whose browse
contains any of the the values of browsenodeid
from B? In terms of the above examples the result will be:
+---------+--=-----+---------+
| column 1| browse| column n|
+---------+--------+---------+
| foo1| [X,Y,Z]| bar1| <- because Z is a value of B.browsenodeid
| foo3| [M]| bar3| <- because M is a value of B.browsenodeid
If I had a single value then I would use something like
A.filter(array_contains(A("browse"), single_value))
But what do I do with a list or DataFrame of values?
I found an elegant solution for this, without the need to cast DataFrame
s/Dataset
s to RDD
s.
Assuming you have a DataFrame dataDF
:
+---------+--------+---------+
| column 1| browse| column n|
+---------+--------+---------+
| foo1| [X,Y,Z]| bar1|
| foo2| [K,L]| bar2|
| foo3| [M]| bar3|
and an array b
containing the values you want to match in browse
val b: Array[String] = Array(M,Z)
Implement the udf:
def array_contains_any(s: Seq[String]): UserDefinedFunction = udf((c: WrappedArray[String]) => c.toList.intersect(s).nonEmpty)
and then simply use the filter
or where
function (with a little bit of fancy currying :P) to do the filtering like:
dataDF.where(array_contains_any(b)($"browse"))
Assume input data:Dataframe A
browse
200,300,889,767,9908,7768,9090
300,400,223,4456,3214,6675,333
234,567,890
123,445,667,887
and you have to match it with Dataframe B
browsenodeid:(I flatten the column browsenodeid) 123,200,300
val matchSet = "123,200,300".split(",").toSet
val rawrdd = sc.textFile("D:\\Dataframe_A.txt")
rawrdd.map(_.split("|"))
.map(arr => arr(0).split(",").toSet.intersect(matchSet).mkString(","))
.foreach(println)
Your output:
300,200
300
123
Updated
val matchSet = "A,Z,M".split(",").toSet
val rawrdd = sc.textFile("/FileStore/tables/mvv45x9f1494518792828/input_A.txt")
rawrdd.map(_.split("|"))
.map(r => if (! r(1).split(",").toSet.intersect(matchSet).isEmpty) org.apache.spark.sql.Row(r(0),r(1), r(2))).collect.foreach(println)
Output is
foo1,X,Y,Z,bar1
foo3,M,bar3