Can only zip with RDD which has the same number of

2019-09-09 19:25发布

问题:

I have a ipython notebook which has pyspark code and it works fine on my machine but when I try to run it on a different machine it throws error at this line (rdd3 line):

rdd2 = sc.parallelize(list1) 
rdd3 = rdd1.zip(rdd2).map(lambda ((x1,x2,x3,x4), y): (y,x2, x3, x4))
list = rdd3.collect()

The error I get is:

    ValueError                                Traceback (most recent call last)
    <ipython-input-7-9daab52fc089> in <module>()

    ---> 16 rdd3 = rdd1.zip(rdd2).map(lambda ((x1,x2,x3,x4), y): (y,x2, x3, x4))


    /usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py in zip(self, other)
       1960 
       1961         if self.getNumPartitions() != other.getNumPartitions():
    -> 1962             raise ValueError("Can only zip with RDD which has the same number of partitions")
       1963 
       1964         # There will be an Exception in JVM if there are different number

I don't know why is this error coming on one machine but not on another machine? ValueError: Can only zip with RDD which has the same number of partitions

回答1:

zip is generally speaking a tricky operation. It requires both RDDs not only to have the same number of partitions but also the same number of elements per partition.

Excluding some special cases this is guaranteed only if both RDDs have the same ancestor and there are not shuffles and operations potentially changing number of elements (filter, flatMap) between the common ancestor and the current state. Typically it means only map (1-to-1) transformations.

If you know that the order is otherwise preserved but the number of partitions or elements per partition differ you can use join with indices:

from operator import itemgetter

def custom_zip(rdd1, rdd2):
    index = itemgetter(1)
    def prepare(rdd, npart):
        return (rdd.zipWithIndex()
                   .sortByKey(index, numPartitions=npart)
                   .keys())

    npart = rdd1.getNumPartitions() + rdd2.getNumPartitions() 

    return prepare(rdd1, npart).zip(prepare(rdd2, npart))     

rdd1 = sc.parallelize(["a_{}".format(x) for x in range(20)], 5)
rdd2 = sc.parallelize(["b_{}".format(x) for x in range(20)], 10)

rdd1.zip(rdd2).take(5)
## ValueError                                Traceback (most recent call last)
## ...
## ValueError: Can only zip with RDD which has the same number of partitions

custom_zip(rdd1, rdd2).take(5)
## [('a_0', 'b_0'), ('a_1', 'b_1'), ('a_2', 'b_2'), 
##     ('a_3', 'b_3'), ('a_4', 'b_4')]

Scala equivalent would be something like this:

def prepare[T: ClassTag](rdd: RDD[T], n: Int) = 
  rdd.zipWithIndex.sortBy(_._2, true, n).keys

def customZip[T: ClassTag, U: ClassTag](rdd1: RDD[T], rdd2: RDD[U]) = {
  val n = rdd1.partitions.size + rdd2.partitions.size
  prepare(rdd1, n).zip(prepare(rdd2, n))
}

val rdd1 = sc.parallelize((0 until 20).map(i => s"a_$i"), 5)
val rdd2 = sc.parallelize((0 until 20).map(i => s"b_$i"), 10)

rdd1.zip(rdd2)

// java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
//  at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRD
//  ...

customZip(rdd1, rdd2).take(5)
// Array[(String, String)] = 
//   Array((a_0,b_0), (a_1,b_1), (a_2,b_2), (a_3,b_3), (a_4,b_4))