How can I partition a RDD respecting order?

2020-05-09 22:33发布

问题:

I would like to divide a RDD in a number of partitions corresponding to the number of different keys I found (3 in this case):

RDD: [(1,a), (1,b), (1,c), (2,d), (3,e), (3,f), (3,g), (3,h), (3,i)]

What I do now is that elements with the same key will fall into the same partition:

[(1,a), (1,b), (1,c)]
[(2,d)]
[(3,e), (3,f), (3,g), (3,h), (3,i)]

This is how I partition

val partitionedRDD = rdd.partitionBy(new PointPartitioner(
     rdd.keys.distinct().count().asInstanceOf[Int]))

This is PoinPartitioner class

class PointPartitioner(numParts: Int) extends org.apache.spark.Partitioner{

import org.apache.spark.Partitioner
  override def numPartitions: Int = numParts

  override def getPartition(key: Any): Int = {
    key.hashCode % numPartitions
  }

  override def equals(other: Any): Boolean = other match
  {
    case dnp: PointPartitioner =>
      dnp.numPartitions == numPartitions
    case _ =>
      false
  }
}

However, elements are unbalanced across partitions. What I would like to obtain is a RDD partitioned like this, where all the partitions contain roughly the same number of elements, respecting the order of the keys:

[(1,a), (1,b), (1,c)]
[(2,d), (3,e), (3,f)]
[(3,g), (3,h), (3,i)]

What could I try?

回答1:

Assigning your partitions like this

p1 [(1,a), (1,b), (1,c)]
p2 [(2,d), (3,e), (3,f)]
p3 [(3,g), (3,h), (3,i)]

would mean that you would like to assign the same partition key to different partitions (for 3 it's p2 or p3). Just like for mathematical functions it cannot have many values for the same argument (what does the value depend on then?).

What you could do instead is adding something to your partition key which would result in having more buckets (effectively splitting one set into smaller sets). But you have no control (virtually) over how Spark places your partitions onto the nodes so the data which you wanted to be on the same node can span across multiple nodes.

That really boils down to what job you would like to perform. I would recommend considering what is the outcome you want to get and see if you can come up with some smart partition key with a reasonable tradeoff (if really necessary). Maybe you could hold values by the letter and then use operations like reduceByKey rather than the groupByKey to get your final results?