I load a dataset
val data = sc.textFile("/home/kybe/Documents/datasets/img.csv",defp)
I want to put an index on this data thus
val nb = data.count.toInt
val tozip = sc.parallelize(1 to nb).repartition(data.getNumPartitions)
val res = tozip.zip(data)
Unfortunately i have the following error
Can only zip RDDs with same number of elements in each partition
How can i modify the number of element by partition if it is possible ?
Why it doesn't work?
The documentation for zip() states:
So we need to make sure we meet 2 conditions:
You are making sure that you will have the same number of partitions with
repartition()
but Spark doesn't guarantee that you will have the same distribution in each partition for each RDD.Why is that?
Because there are different types of RDDs and most of them have different partitioning strategies! For example:
sc.parallelize(collection)
it will see how many partitions there should be, will check the size of the collection and calculate thestep
size. I.e. you have 15 elements in the list and want 4 partitions, first 3 will have 4 consecutive elements last one will have the remaining 3.<Long, Text>
and you just wantString
:-)In your example Spark internally does create different types of RDDs (
CoalescedRDD
andShuffledRDD
) while doing the repartitioning but I think you got the global idea that different RDDs have different partitioning strategies :-)Notice that the last part of the
zip()
doc mentions themap()
operation. This operation does not repartition as it's a narrow transformation data so it would guarantee both conditions.Solution
In this simple example as it was mentioned you can do simply
data.zipWithIndex
. If you need something more complicated then creating the new RDD forzip()
should be created withmap()
as mentioned above.I solved this by creating an implicit helper like so
Which can then be used like
NB: Keep in mind that the
join
will cause a shuffle.