How can I efficiently join a large rdd to a very l

2020-06-03 04:50发布

I have two RDDs. One RDD is between 5-10 million entries and the other RDD is between 500 million - 750 million entries. At some point, I have to join these two rdds using a common key.

val rddA = someData.rdd.map { x => (x.key, x); } // 10-million
val rddB = someData.rdd.map { y => (y.key, y); } // 600-million
var joinRDD = rddA.join(rddB);

When spark decides to do this join, it decides to do a ShuffledHashJoin. This causes many of the items in rddB to be shuffled on the network. Likewise, some of rddA are also shuffled on the network. In this case, rddA is too "big" to use as a broadcast variable, but seems like a BroadcastHashJoin would be more efficient. Is there to hint to spark to use a BroadcastHashJoin? (Apache Flink supports this through join hints).

If not, is the only option to increase the autoBroadcastJoinThreshold?

Update 7/14

My performance issue appears to be squarely rooted in repartitioning. Normally, an RDD read from HDFS would be partitioned by block, but in this case, the source was a parquet datasource [that I made]. When spark (databricks) writes the parquet file, it writes one file per partition, and identically, it reads one partition per file. So, the best answer I've found is that during production of the datasource, to partition it by key then, write out the parquet sink (which is then naturally co-partitioned) and use that as rddB.

The answer given is correct, but I think the details about parquet datasource may be useful to someone else.

1条回答
forever°为你锁心
2楼-- · 2020-06-03 05:06

You can partition RDD's with the same partitioner, in this case partitions with the same key will be collocated on the same executor.

In this case you will avoid shuffle for join operations.

Shuffle will happen only once, when you'll update parititoner, and if you'll cache RDD's all joins after that should be local to executors

import org.apache.spark.SparkContext._

class A
class B

val rddA: RDD[(String, A)] = ???
val rddB: RDD[(String, B)] = ???

val partitioner = new HashPartitioner(1000)

rddA.partitionBy(partitioner).cache()
rddB.partitionBy(partitioner).cache()

Also you can try to update broadcast threshold size, maybe rddA can broadcasted:

--conf spark.sql.autoBroadcastJoinThreshold=300000000 # ~300 mb

We use 400mb for broadcast joins, and it works well.

查看更多
登录 后发表回答