Enforce partition be stored on the specific execut

2019-02-19 14:58发布

问题:

I have 5-parititions-RDD and 5 workers/executors. How can I ask Spark to save each RDD's partition on different worker (ip)?

Am I right if I say Spark can save few partitions on one worker, and 0 partiotions on other worker? Meas I can specify the number of partitions, but Spark still can cache everything on single node.

Replication is not an option, since RDD is huge.

Workarounds I have found

getPreferredLocations

RDD's getPreferredLocations method does not provide 100% waranty that partition will be stored on specified node. Spark will try during spark.locality.wait, but afterwards Spark will cache partion on different node.

As a workarround, you can set very high value to spark.locality.wait and override getPreferredLocations. The bad news - you can not do that with Java, u need to write Scala code. At least Scala internals wrapped with Java code. I.e:

class NodeAffinityRDD[U: ClassTag](prev: RDD[U]) extends RDD[U](prev) {

  val nodeIPs = Array("192.168.2.140","192.168.2.157","192.168.2.77")

  override def getPreferredLocations(split: Partition): Seq[String] =
    Seq(nodeIPs(split.index % nodeIPs.length))
}

SparkContext's makeRDD

SparkContext has makeRDD method. This method lack documetaion. As I understand, I can specify preferred locations, and than set high value to spark.locality.wait. The bad news - preferred location will be discarded on the first shuffle/join/cogroup operation.


Both approaches has drawback of too high spark.locality.wait can cause your cluster sturve if some of nodes will be unavailable.

P.S. More context

I have up to 10,000 of sales-XXX.parquet files, each represents sales of different goods at different region. Each sales-XXX.parquet could vary from few KBs to few GBs. All sales-XXX.parquets together could take up to tens or hundreds of GBs at HDFS. I need full text search through all sales. I have to index each sales-XXX.parquet one-by-one with Lucene. And now I have two options:

  1. Keep Lucene indexes in Spark. There is already solution for this, but it looks pretty suspicious. Is there any better solutions?
  2. Keep Lucene indexes at local file system. Than I can map-reduce on the results of each worker's index lookup. But this approach requires each worker node keeps equal amount of data. How could I ensure Spark will keep equal amount of data on each worker node?