How to merge two presorted rdds in spark?

2019-05-14 06:06发布

问题:

I have two large csv files presorted by one of the columns. Is there a way to use the fact that they are already sorted to get a new sorted RDD faster, without full sorting again?

回答1:

The short answer: No, there is no way to leverage the fact that two input RDDs are already sorted when using the sort facilities offered by Apache Spark.

The long answer: Under certain conditions, there might be a better way than using sortBy or sortByKey.

The most obvious case is when the input RDDs are already sorted and represent distinct ranges. In this case, simply using rdd1.union(rdd2) is the fastest (virtually zero cost) way for combining the input RDDs, assuming that all elements in rdd1 come before all elements in rdd2 (according to the chosen ordering).

When the ranges of the input RDDs overlap, things get more tricky. Assuming that the target RDD shall only have a single partition, it might be efficient to use toLocalIterator on both RDDs and then do a merge manually. If the result has to be an RDD, one could do this inside the compute method of a custom RDD type, processing the input RDDs and generating the outputs.

When the inputs are large and thus consist of many partitions, things get even trickier. In this case, you probably want multiple partitions in the output RDD as well. You could use the custom RDD mentioned earlier, but create multiple partitions (using a RangePartitioner). Each partition would cover a distinct range of elements (in the optimal case, these ranges would cover roughly equally sized parts of the output).

The tricky part with this is avoiding to process the complete input RDDs multiple times inside compute. This can be avoided efficiently using filterByRange from OrderedRDDFunctions when the input RDDs are using a RangePartitioner. When they are not using a RangePartitioner, but you know that partitions are ordered internally and also have a global order, you would first need to find out the effective ranges covered by these partitions by actually probing into the data.

As the multiple partition case is rather complex, I would check whether the custom-made sort is really faster than simply using sortBy or sortByKey. The logic for sortBy and sortByKey is highly optimized regarding the shuffling process (transferring data between nodes). For this reason, it might well be that for many cases these methods are faster than the custom-made logic, even though the custom-made logic could be O(n) while sortBy / sortByKey can be O(n log(n)) at best.

If you are interested in learning more about the shuffling logic used by Apache Spark, there is an article explaining the basic concept.