I would like to divide a RDD in a number of partitions corresponding to the number of different keys I found (3 in this case):
RDD: [(1,a), (1,b), (1,c), (2,d), (3,e), (3,f), (3,g), (3,h), (3,i)]
What I do now is that elements with the same key will fall into the same partition:
[(1,a), (1,b), (1,c)]
[(2,d)]
[(3,e), (3,f), (3,g), (3,h), (3,i)]
This is how I partition
val partitionedRDD = rdd.partitionBy(new PointPartitioner(
rdd.keys.distinct().count().asInstanceOf[Int]))
This is PoinPartitioner class
class PointPartitioner(numParts: Int) extends org.apache.spark.Partitioner{
import org.apache.spark.Partitioner
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int = {
key.hashCode % numPartitions
}
override def equals(other: Any): Boolean = other match
{
case dnp: PointPartitioner =>
dnp.numPartitions == numPartitions
case _ =>
false
}
}
However, elements are unbalanced across partitions. What I would like to obtain is a RDD partitioned like this, where all the partitions contain roughly the same number of elements, respecting the order of the keys:
[(1,a), (1,b), (1,c)]
[(2,d), (3,e), (3,f)]
[(3,g), (3,h), (3,i)]
What could I try?