I load a dataset
val data = sc.textFile("/home/kybe/Documents/datasets/img.csv",defp)
I want to put an index on this data thus
val nb = data.count.toInt
val tozip = sc.parallelize(1 to nb).repartition(data.getNumPartitions)
val res = tozip.zip(data)
Unfortunately i have the following error
Can only zip RDDs with same number of elements in each partition
How can i modify the number of element by partition if it is possible ?
Why it doesn't work?
The documentation for zip() states:
Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).
So we need to make sure we meet 2 conditions:
- both RDDs have the same number of partitions
- respective partitions in those RDDs have exactly the same size
You are making sure that you will have the same number of partitions with repartition()
but Spark doesn't guarantee that you will have the same distribution in each partition for each RDD.
Why is that?
Because there are different types of RDDs and most of them have different partitioning strategies! For example:
- ParallelCollectionRDD is created when you parallelise a collection with
sc.parallelize(collection)
it will see how many partitions there should be, will check the size of the collection and calculate the step
size. I.e. you have 15 elements in the list and want 4 partitions, first 3 will have 4 consecutive elements last one will have the remaining 3.
- HadoopRDD if I remember correctly, one partition per file block. Even though you are using a local file internally Spark first creates a this kind of RDD when you read a local file and then maps that RDD since that RDD is a pair RDD of
<Long, Text>
and you just want String
:-)
- etc.etc.
In your example Spark internally does create different types of RDDs (CoalescedRDD
and ShuffledRDD
) while doing the repartitioning but I think you got the global idea that different RDDs have different partitioning strategies :-)
Notice that the last part of the zip()
doc mentions the map()
operation. This operation does not repartition as it's a narrow transformation data so it would guarantee both conditions.
Solution
In this simple example as it was mentioned you can do simply data.zipWithIndex
. If you need something more complicated then creating the new RDD for zip()
should be created with map()
as mentioned above.
I solved this by creating an implicit helper like so
implicit class RichContext[T](rdd: RDD[T]) {
def zipShuffle[A](other: RDD[A])(implicit kt: ClassTag[T], vt: ClassTag[A]): RDD[(T, A)] = {
val otherKeyd: RDD[(Long, A)] = other.zipWithIndex().map { case (n, i) => i -> n }
val thisKeyed: RDD[(Long, T)] = rdd.zipWithIndex().map { case (n, i) => i -> n }
val joined = new PairRDDFunctions(thisKeyed).join(otherKeyd).map(_._2)
joined
}
}
Which can then be used like
val rdd1 = sc.parallelize(Seq(1,2,3))
val rdd2 = sc.parallelize(Seq(2,4,6))
val zipped = rdd1.zipShuffle(rdd2) // Seq((1,2),(2,4),(3,6))
NB: Keep in mind that the join
will cause a shuffle.