I want to use intersection()
by key or filter()
in spark.
But I really don't know how to use intersection()
by key.
So I tried to use filter()
, but it's not worked.
example - here is two RDD:
data1 //RDD[(String, Int)] = Array(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1))
data2 //RDD[(String, Int)] = Array(("a", 3), ("b", 5))
val data3 = data2.map{_._1}
data1.filter{_._1 == data3}.collect //Array[(String, Int] = Array()
I want to get a (key, value) pair with the same key as data1
based on the key that data2
has.
Array(("a", 1), ("a", 2), ("b", 2), ("b", 3))
is the result I want.
Is there a method to solve this problem using intersection()
by key or filter()
?
For your problem, I think
cogroup()
is better suited. Theintersection()
method will consider both keys and values in your data, and will result in an emptyrdd
.The function
cogroup()
groups the values of bothrdd
's by key and gives us(key, vals1, vals2)
, wherevals1
andvals2
contain the values ofdata1
anddata2
respectively, for each key. Note that if a certain key is not shared in both datasets, one ofvals1
orvals2
will be returned as an emptySeq
, hence we'll first have to filter out these tuples to arrive at the intersection of the twordd
's.Next, we'll grab
vals1
- which contains the values fromdata1
for the common keys - and convert it to format(key, Array)
. Lastly we useflatMapValues()
to unpack the result into the format of(key, value)
.1.
broadcast
variable infilter()
- needs scalability improvement2.
cogroup
(similar to group by key)3. Using inner join
Here
data1.join(data2)
holds pairs with common keys (inner join)