Can only zip RDDs with same number of elements in

2019-02-24 19:39发布

I load a dataset

val data = sc.textFile("/home/kybe/Documents/datasets/img.csv",defp)

I want to put an index on this data thus

val nb = data.count.toInt
val tozip = sc.parallelize(1 to nb).repartition(data.getNumPartitions)

val res = tozip.zip(data)

Unfortunately i have the following error

Can only zip RDDs with same number of elements in each partition

How can i modify the number of element by partition if it is possible ?

2条回答
戒情不戒烟
2楼-- · 2019-02-24 20:03

Why it doesn't work?

The documentation for zip() states:

Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).

So we need to make sure we meet 2 conditions:

  • both RDDs have the same number of partitions
  • respective partitions in those RDDs have exactly the same size

You are making sure that you will have the same number of partitions with repartition() but Spark doesn't guarantee that you will have the same distribution in each partition for each RDD.

Why is that?

Because there are different types of RDDs and most of them have different partitioning strategies! For example:

  • ParallelCollectionRDD is created when you parallelise a collection with sc.parallelize(collection) it will see how many partitions there should be, will check the size of the collection and calculate the step size. I.e. you have 15 elements in the list and want 4 partitions, first 3 will have 4 consecutive elements last one will have the remaining 3.
  • HadoopRDD if I remember correctly, one partition per file block. Even though you are using a local file internally Spark first creates a this kind of RDD when you read a local file and then maps that RDD since that RDD is a pair RDD of <Long, Text> and you just want String :-)
  • etc.etc.

In your example Spark internally does create different types of RDDs (CoalescedRDD and ShuffledRDD) while doing the repartitioning but I think you got the global idea that different RDDs have different partitioning strategies :-)

Notice that the last part of the zip() doc mentions the map() operation. This operation does not repartition as it's a narrow transformation data so it would guarantee both conditions.

Solution

In this simple example as it was mentioned you can do simply data.zipWithIndex. If you need something more complicated then creating the new RDD for zip() should be created with map() as mentioned above.

查看更多
SAY GOODBYE
3楼-- · 2019-02-24 20:10

I solved this by creating an implicit helper like so

implicit class RichContext[T](rdd: RDD[T]) {
  def zipShuffle[A](other: RDD[A])(implicit kt: ClassTag[T], vt: ClassTag[A]): RDD[(T, A)] = {
    val otherKeyd: RDD[(Long, A)] = other.zipWithIndex().map { case (n, i) => i -> n }
    val thisKeyed: RDD[(Long, T)] = rdd.zipWithIndex().map { case (n, i) => i -> n }
    val joined                    = new PairRDDFunctions(thisKeyed).join(otherKeyd).map(_._2)
    joined
  }
}

Which can then be used like

val rdd1   = sc.parallelize(Seq(1,2,3))
val rdd2   = sc.parallelize(Seq(2,4,6))
val zipped = rdd1.zipShuffle(rdd2) // Seq((1,2),(2,4),(3,6))

NB: Keep in mind that the join will cause a shuffle.

查看更多
登录 后发表回答