I have two RDD's which have key-value pairs. I want to join them by key (and according to the key, get cartesian product of all values), which I assume can be done with zip() function of pyspark. However, when I apply this,
elemPairs = elems1.zip(elems2).reduceByKey(add)
It gives me the error:
Cannot deserialize RDD with different number of items in pair: (40, 10)
And here are the 2 RDD's which I try to zip:
elems1 => [((0, 0), ('A', 0, 90)), ((0, 1), ('A', 0, 90)), ((0, 2), ('A', 0, 90)), ((0, 3), ('A', 0, 90)), ((0, 4), ('A', 0, 90)), ((0, 0), ('A', 1, 401)), ((0, 1), ('A', 1, 401)), ((0, 2), ('A', 1, 401)), ((0, 3), ('A', 1, 401)), ((0, 4), ('A', 1, 401)), ((1, 0), ('A', 0, 305)), ((1, 1), ('A', 0, 305)), ((1, 2), ('A', 0, 305)), ((1, 3), ('A', 0, 305)), ((1, 4), ('A', 0, 305)), ((1, 0), ('A', 1, 351)), ((1, 1), ('A', 1, 351)), ((1, 2), ('A', 1, 351)), ((1, 3), ('A', 1, 351)), ((1, 4), ('A', 1, 351)), ((2, 0), ('A', 0, 178)), ((2, 1), ('A', 0, 178)), ((2, 2), ('A', 0, 178)), ((2, 3), ('A', 0, 178)), ((2, 4), ('A', 0, 178)), ((2, 0), ('A', 1, 692)), ((2, 1), ('A', 1, 692)), ((2, 2), ('A', 1, 692)), ((2, 3), ('A', 1, 692)), ((2, 4), ('A', 1, 692)), ((3, 0), ('A', 0, 936)), ((3, 1), ('A', 0, 936)), ((3, 2), ('A', 0, 936)), ((3, 3), ('A', 0, 936)), ((3, 4), ('A', 0, 936)), ((3, 0), ('A', 1, 149)), ((3, 1), ('A', 1, 149)), ((3, 2), ('A', 1, 149)), ((3, 3), ('A', 1, 149)), ((3, 4), ('A', 1, 149))]
elems2 => [((0, 0), ('B', 0, 573)), ((1, 0), ('B', 0, 573)), ((2, 0), ('B', 0, 573)), ((3, 0), ('B', 0, 573)), ((4, 0), ('B', 0, 573)), ((0, 0), ('B', 1, 324)), ((1, 0), ('B', 1, 324)), ((2, 0), ('B', 1, 324)), ((3, 0), ('B', 1, 324)), ((4, 0), ('B', 1, 324))]
Where ((0, 0), ('B', 0, 573)), (0, 0)
is the key and ('B', 0, 573)
is the value.
After a quick google search, I found that it is a problem which only occurs in spark 1.2, however I have used Spark 1.5
The reason of that error message is described in the RDD API.
As @alwaysprep said, you could use
join
, aszip
does something completely different:As you can see, zip associates the n-th element of array
a
to the n-th element of arrayb
, hence the arrays must have the same size.In your case, array
elem1
contains more elements thanelem2
- maybe you can have a look atrightOuterJoin
(orleftOuterJoin
). This is due to the fact that .join is going to skip those elements without key in both the RDDs. For example, I see that (4,0) is only present inelem2
. If youjoin
them, then it will be skipped, as it won't be found in theelem1
array. Additionally, there is the method.cartesian
, if you really want the cartesian product.Why not just use elems1.join(elems2)