PySpark, intersection by Key

2019-02-27 23:45发布

问题:

for example I have two RDDs in PySpark:

((0,0), 1)
((0,1), 2)
((1,0), 3)
((1,1), 4)

and second is just

((0,1), 3)
((1,1), 0)

I want to have intersection from the first RDD with the second one. Actually, second RDDs has to play the role of the mask for the first. The output should be:

((0,1), 2)
((1,1), 4)

it means the values from the first RDD, but only for the keys from the second. The lengths of both RDDs are different.

I have some solution (have to prove), but something like this:

rdd3 = rdd1.cartesian(rdd2)
rdd4 = rdd3.filter(lambda((key1, val1), (key2, val2)): key1 == key2)
rdd5 = rdd4.map(lambda((key1, val1), (key2, val2)): (key1, val1))

I don't know, how efficient is this solution. would like to hear the opinion of experienced Spark programmers....

回答1:

Perhaps we shouldn't think of this process as join. You're not really looking to join two datasets, you're looking to subtract one dataset from the other?

I'm going to state what I am assuming from your question

  1. You don't care about the values in the second dataset, at all.
  2. You only want to keep the values in the first dataset where the key value pair appears in the second dataset.

Idea 1: Cogroup (I think probably the fastest way). It's basically calculating the intersection of both datasets.

rdd1 = sc.parallelize([((0,0), 1), ((0,1), 2), ((1,0), 3), ((1,1), 4)])
rdd2 = sc.parallelize([((0,1), 3), ((1,1), 0)])
intersection = rdd1.cogroup(rdd2).filter(lambda x: x[1][0] and x[1][1])
final_rdd = intersection.map(lambda x: (x[0], list(x[1][0]))).map(lambda (x,y): (x, y[0]))

Idea 2: Subtract By Key

rdd1 = sc.parallelize([((0,0), 1), ((0,1), 2), ((1,0), 3), ((1,1), 4)])
rdd2 = sc.parallelize([((0,1), 3), ((1,1), 0)])

unwanted_rows = rdd1.subtractByKey(rdd2)
wanted_rows = rdd1.subtractByKey(unwanted_rows)

I'm not 100% sure if this is faster than your method. It does require two subtractByKey operations, which can be slow. Also, this method does not preserve order (e.g. ((0, 1), 2), despite being first in your first dataset, is second in the final dataset). But I can't imagine this matters.

As to which is faster, I think it depends on how long your cartersian join takes. Mapping and filtering tend to be faster than the shuffle operations needed for subtractByKey, but of course cartesian is a time consuming process.

Anyway, I figure you can try out this method and see if it works for you!


A sidenote for performance improvements, depending on how large your RDDs are.

If rdd1 is small enough to be held in main memory, the subtraction process can be sped up immensely if you broadcast it and then stream rdd2 against it. However, I acknowledge that this is rarely the case.



标签: pyspark rdd