I'd like to select a range of elements in a Spark RDD. For example, I have an RDD with a hundred elements, and I need to select elements from 60 to 80. How do I do that?
I see that RDD has a take(i: int) method, which returns the first i elements. But there is no corresponding method to take the last i elements, or i elements from the middle starting at a certain index.
I don't think there is an efficient method to do this yet. But the easy way is using
filter()
, lets say you have an RDD,pairs
with key value pairs and you only want elements from 60 to 80 inclusive just do.I think it's possible that this could be done more efficiently in the future, by using
sortByKey
and saving information about the range of values mapped to each partition. Keep in mind this approach would only save anything if you were planning to query the range multiple times because the sort is obviously expensive.From looking at the spark source it would definitely be possible to do efficient range queries using
RangePartitioner
:This is a private member of
RangePartitioner
with the knowledge of all the upper bounds of the partitions, it would be easy to only query the necessary partitions. It looks like this is something spark users may see in the future: SPARK-911UPDATE: Way better answer, based on pull request I'm writing for SPARK-911. It will run efficiently if the RDD is sorted and you query it multiple times.
If having the whole partition in memory is acceptable you could even do something like this.
val glommedAndCached = sorted.glom()cache(); glommedAndCached.map(a => a.slice(a.search(lower),a.search(upper)+1)).collect()
search
is not a member BTW I just made an implicit class that has a binary search function, not shown hereHow big is your data set? You might be able to do what you need with:
This seems inefficient, but for small to medium-sized data, should work.
Is it possible to solve this in another way? What's the case for picking exactly a certain range out of the middle of your data? Would
takeSample
serve you better?Following should be able to get the range. Note the cache will save you some overhead, because internally zipWithIndex need to scan the RDD partition to get the number of elements in each partition.
For those who stumble on this question looking for Spark 2.x-compatible answer, you can use filterByRange