I have 5-parititions-RDD and 5 workers/executors. How can I ask Spark to save each RDD's partition on different worker (ip)?
Am I right if I say Spark can save few partitions on one worker, and 0 partiotions on other worker? Meas I can specify the number of partitions, but Spark still can cache everything on single node.
Replication is not an option, since RDD is huge.
Workarounds I have found
getPreferredLocations
RDD's getPreferredLocations
method does not provide 100% waranty that partition will be stored on specified node. Spark will try during spark.locality.wait
, but afterwards Spark will cache partion on different node.
As a workarround, you can set very high value to spark.locality.wait
and override getPreferredLocations
. The bad news - you can not do that with Java, u need to write Scala code. At least Scala internals wrapped with Java code. I.e:
class NodeAffinityRDD[U: ClassTag](prev: RDD[U]) extends RDD[U](prev) {
val nodeIPs = Array("192.168.2.140","192.168.2.157","192.168.2.77")
override def getPreferredLocations(split: Partition): Seq[String] =
Seq(nodeIPs(split.index % nodeIPs.length))
}
SparkContext's makeRDD
SparkContext has makeRDD method. This method lack documetaion. As I understand, I can specify preferred locations, and than set high value to spark.locality.wait
. The bad news - preferred location will be discarded on the first shuffle/join/cogroup operation.
Both approaches has drawback of too high spark.locality.wait
can cause your cluster sturve if some of nodes will be unavailable.
P.S. More context
I have up to 10,000 of sales-XXX.parquet
files, each represents sales of different goods at different region. Each sales-XXX.parquet
could vary from few KBs to few GBs. All sales-XXX.parquet
s together could take up to tens or hundreds of GBs at HDFS.
I need full text search through all sales. I have to index each sales-XXX.parquet
one-by-one with Lucene. And now I have two options:
- Keep Lucene indexes in Spark. There is already solution for this, but it looks pretty suspicious. Is there any better solutions?
- Keep Lucene indexes at local file system. Than I can map-reduce on the results of each worker's index lookup. But this approach requires each worker node keeps equal amount of data. How could I ensure Spark will keep equal amount of data on each worker node?