In Apache Spark, Is it possible to specify partiti

2019-04-23 06:49发布

As for Spark 1.6+, the only API that supports customizing partition location is when the RDD is created:

  /** Distribute a local Scala collection to form an RDD, with one or more
    * location preferences (hostnames of Spark nodes) for each object.
    * Create a new partition for each collection item. */
  def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]

Despite being very useful in some cases (e.g. when RDD.compute() has to access some local resources, not just HDFS). This is the only place where such setting is exposed, yet it will quickly be discarded after the first shuffle (where downstream partition will inherit preferredLocation from their biggest parent)

  // snippet from org.apache.spark.rdd.ShuffledRDD.scala
  override protected def getPreferredLocations(partition: Partition): Seq[String] = {
    val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    tracker.getPreferredLocationsForShuffle(dep, partition.index)
  }

and cogroup/join (where downstream partition will use the first parent that has an explicit Partitioner).

I'm wondering if this design is deliberate, or a better solution for colocation exists. What do you think would be the best solution to specify preferredLocation for a shuffled/coalesced/cogrouped/join RDD? Do I have to write my own RDD instances to achieve this?

Thanks a lot for your insight.

UPDATE I have conjectured a possible solution which to my surprise doesn't work:

In Apache Spark cogroup, how to make sure 1 RDD of >2 operands is not moved?

so I delete the answer, if you have something that works you are welcomed to share here, otherwise we will have to wait for https://issues.apache.org/jira/browse/SPARK-18078

0条回答
登录 后发表回答