Cannot deserialize RDD with different number of it

2019-06-14 12:12发布

问题:

I have two RDD's which have key-value pairs. I want to join them by key (and according to the key, get cartesian product of all values), which I assume can be done with zip() function of pyspark. However, when I apply this,

elemPairs = elems1.zip(elems2).reduceByKey(add)

It gives me the error:

Cannot deserialize RDD with different number of items in pair: (40, 10)

And here are the 2 RDD's which I try to zip:

elems1 => [((0, 0), ('A', 0, 90)), ((0, 1), ('A', 0, 90)), ((0, 2), ('A', 0, 90)), ((0, 3), ('A', 0, 90)), ((0, 4), ('A', 0, 90)), ((0, 0), ('A', 1, 401)), ((0, 1), ('A', 1, 401)), ((0, 2), ('A', 1, 401)), ((0, 3), ('A', 1, 401)), ((0, 4), ('A', 1, 401)), ((1, 0), ('A', 0, 305)), ((1, 1), ('A', 0, 305)), ((1, 2), ('A', 0, 305)), ((1, 3), ('A', 0, 305)), ((1, 4), ('A', 0, 305)), ((1, 0), ('A', 1, 351)), ((1, 1), ('A', 1, 351)), ((1, 2), ('A', 1, 351)), ((1, 3), ('A', 1, 351)), ((1, 4), ('A', 1, 351)), ((2, 0), ('A', 0, 178)), ((2, 1), ('A', 0, 178)), ((2, 2), ('A', 0, 178)), ((2, 3), ('A', 0, 178)), ((2, 4), ('A', 0, 178)), ((2, 0), ('A', 1, 692)), ((2, 1), ('A', 1, 692)), ((2, 2), ('A', 1, 692)), ((2, 3), ('A', 1, 692)), ((2, 4), ('A', 1, 692)), ((3, 0), ('A', 0, 936)), ((3, 1), ('A', 0, 936)), ((3, 2), ('A', 0, 936)), ((3, 3), ('A', 0, 936)), ((3, 4), ('A', 0, 936)), ((3, 0), ('A', 1, 149)), ((3, 1), ('A', 1, 149)), ((3, 2), ('A', 1, 149)), ((3, 3), ('A', 1, 149)), ((3, 4), ('A', 1, 149))]

elems2 => [((0, 0), ('B', 0, 573)), ((1, 0), ('B', 0, 573)), ((2, 0), ('B', 0, 573)), ((3, 0), ('B', 0, 573)), ((4, 0), ('B', 0, 573)), ((0, 0), ('B', 1, 324)), ((1, 0), ('B', 1, 324)), ((2, 0), ('B', 1, 324)), ((3, 0), ('B', 1, 324)), ((4, 0), ('B', 1, 324))]

Where ((0, 0), ('B', 0, 573)), (0, 0) is the key and ('B', 0, 573) is the value.

After a quick google search, I found that it is a problem which only occurs in spark 1.2, however I have used Spark 1.5

回答1:

Why not just use elems1.join(elems2)



回答2:

The reason of that error message is described in the RDD API.

Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).

As @alwaysprep said, you could use join, as zip does something completely different:

val a = sc.parallelize(1 to 100, 3)
val b = sc.parallelize(101 to 200, 3)
a.zip(b).collect
res1: Array[(Int, Int)] = Array((1,101), (2,102), (3,103), (4,104), (5,105), 
(6,106), (7,107), (8,108), (9,109), (10,110), (11,111), (12,112), (13,113), 
(14,114), (15,115), (16,116), (17,117), (18,118), (19,119), (20,120), (21,121), 
(22,122), (23,123), (24,124), (25,125), (26,126), (27,127), (28,128), (29,129), 
(30,130), (31,131), (32,132), (33,133), (34,134), (35,135), (36,136), (37,137), 
(38,138), (39,139), (40,140), (41,141), (42,142), (43,143), (44,144), (45,145), 
(46,146), (47,147), (48,148), (49,149), (50,150), (51,151), (52,152), (53,153), 
(54,154), (55,155), (56,156), (57,157), (58,158), (59,159), (60,160), (61,161), 
(62,162), (63,163), (64,164), (65,165), (66,166), (67,167), (68,168), (69,169), 
(70,170), (71,171), (72,172), (73,173), (74,174), (75,175), (76,176), (77,177), 
(78,...

As you can see, zip associates the n-th element of array a to the n-th element of array b, hence the arrays must have the same size.

In your case, array elem1 contains more elements than elem2 - maybe you can have a look at rightOuterJoin (or leftOuterJoin). This is due to the fact that .join is going to skip those elements without key in both the RDDs. For example, I see that (4,0) is only present in elem2. If you join them, then it will be skipped, as it won't be found in the elem1 array. Additionally, there is the method .cartesian, if you really want the cartesian product.