How to Define Custom partitioner for Spark RDDs of

2019-01-05 00:00发布

问题:

I am new to Spark. I have a large dataset of elements[RDD] and I want to divide it into two exactly equal sized partitions maintaining order of elements. I tried using RangePartitioner like

var data = partitionedFile.partitionBy(new RangePartitioner(2, partitionedFile))

This doesn't give a satisfactory result because it divides roughly but not exactly equal sized maintaining order of elements. For example if there are 64 elements, we use Rangepartitioner, then it divides into 31 elements and 33 elements.

I need a partitioner such that I get exactly first 32 elements in one half and other half contains second set of 32 elements. Could you please help me by suggesting how to use a customized partitioner such that I get equally sized two halves, maintaining the order of elements?

回答1:

Partitioners work by assigning a key to a partition. You would need prior knowledge of the key distribution, or look at all keys, to make such a partitioner. This is why Spark does not provide you with one.

In general you do not need such a partitioner. In fact I cannot come up with a use case where I would need equal-size partitions. What if the number of elements is odd?

Anyway, let us say you have an RDD keyed by sequential Ints, and you know how many in total. Then you could write a custom Partitioner like this:

class ExactPartitioner[V](
    partitions: Int,
    elements: Int)
  extends Partitioner {

  def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[Int]
    // `k` is assumed to go continuously from 0 to elements-1.
    return k * partitions / elements
  }
}


回答2:

This answer has some inspiration from Daniel, but provides a full implementation (using pimp my library pattern) with an example for peoples copy and paste needs :)

import RDDConversions._

trait RDDWrapper[T] {
  def rdd: RDD[T]
}

// TODO View bounds are deprecated, should use context bounds
// Might need to change ClassManifest for ClassTag in spark 1.0.0
case class RichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
  rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] {
  // Here we use a single Long to try to ensure the sort is balanced, 
  // but for really large dataset, we may want to consider
  // using a tuple of many Longs or even a GUID
  def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] =
    rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey()
    .grouped(numPartitions).map(t => (t._1._1, t._2))
}

case class RichRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] {
  def grouped(size: Int): RDD[T] = {
    // TODO Version where withIndex is cached
    val withIndex = rdd.mapPartitions(_.zipWithIndex)

    val startValues =
      withIndex.mapPartitionsWithIndex((i, iter) => 
        Iterator((i, iter.toIterable.last))).toArray().toList
      .sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L)

    withIndex.mapPartitionsWithIndex((i, iter) => iter.map {
      case (value, index) => (startValues(i) + index.toLong, value)
    })
    .partitionBy(new Partitioner {
      def numPartitions: Int = size
      def getPartition(key: Any): Int = 
        (key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt
    })
    .map(_._2)
  }
}

Then in another file we have

// TODO modify above to be implicit class, rather than have implicit conversions
object RDDConversions {
  implicit def toRichRDD[T: ClassManifest](rdd: RDD[T]): RichRDD[T] = 
    new RichRDD[T](rdd)
  implicit def toRichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
    rdd: RDD[(K, V)]): RichPairRDD[K, V] = RichPairRDD(rdd)
  implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd
}

Then for your use case you just want (assuming it's already sorted)

import RDDConversions._

yourRdd.grouped(2)

Disclaimer: Not tested, kinda just wrote this straight into the SO answer



标签: