I have 2 rdds with different set of partitioners.
case class Person(name: String, age: Int, school: String)
case class School(name: String, address: String)
rdd1
is the RDD of Person
, which I have partitioned based on age
of the person, and then converted the key to school
.
val rdd1: RDD[Person] = rdd1.keyBy(person => (person.age, person))
.partitionBy(new HashPartitioner(10))
.mapPartitions(persons =>
persons.map{case(age,person) =>
(person.school, person)
})
rdd2
is the RDD of School
grouped by name
of the school.
val rdd2: RDD[School] = rdd2.groupBy(_.name)
Now, rdd1
is partitioned based on age of the person, so all persons with same age goes to same partitions. And, rdd2
is partitioned(by default) based on the name of the school.
I want to rdd1.leftOuterJoin(rdd2)
in such a way that rdd1
doesn't get shuffled because rdd1 is very very big compared to rdd2. Also, I'm outputting the result to Cassandra which is partitioned on age
, so current partitioning of rdd1
will fasten the process of writing later.
Is there a way to join there two RDDs without:
1. Shuffling rdd1
and
2. Broadcasting 'rdd2', because rdd2
is bigger than the available memory.
Note: The joined rdd should be partitioned based on age.
Suppose you have two rdds, rdd1 and rdd2 and want to apply join operation. and if rdds has partitioned (partition is set). then calling rdd3 = rdd1.join(rdd2) will make rdd3 partition by rdd1. rdd3 will always take hash partition from rdd1 (first parent, the one that join was called on).