How can I find median of an RDD
of integers using a distributed method, IPython, and Spark? The RDD
is approximately 700,000 elements and therefore too large to collect and find the median.
This question is similar to this question. However, the answer to the question is using Scala, which I do not know.
How can I calculate exact median with Apache Spark?
Using the thinking for the Scala answer, I am trying to write a similar answer in Python.
I know I first want to sort the RDD
. I do not know how. I see the sortBy
(Sorts this RDD by the given keyfunc
) and sortByKey
(Sorts this RDD
, which is assumed to consist of (key, value) pairs.) methods. I think both use key value and my RDD
only has integer elements.
- First, I was thinking of doing
myrdd.sortBy(lambda x: x)
? - Next I will find the length of the rdd (
rdd.count()
). - Finally, I want to find the element or 2 elements at the center of the rdd. I need help with this method too.
EDIT:
I had an idea. Maybe I can index my RDD
and then key = index and value = element. And then I can try to sort by value? I don't know if this is possible because there is only a sortByKey
method.
Spark 2.0+:
You can use
approxQuantile
method which implements Greenwald-Khanna algorithm:Python:
Scala:
where the last parameter is a relative error. The lower the number the more accurate results and more expensive computation.
Since Spark 2.2 (SPARK-14352) it supports estimation on multiple columns:
and
Spark < 2.0
Python
As I've mentioned in the comments it is most likely not worth all the fuss. If data is relatively small like in your case then simply collect and compute median locally:
It takes around 0.01 second on my few years old computer and around 5.5MB of memory.
If data is much larger sorting will be a limiting factor so instead of getting an exact value it is probably better to sample, collect, and compute locally. But if you really want a to use Spark something like this should do the trick (if I didn't mess up anything):
And some tests:
Finally lets define median:
So far so good but it takes 4.66 s in a local mode without any network communication. There is probably way to improve this, but why even bother?
Language independent (Hive UDAF):
If you use
HiveContext
you can also use Hive UDAFs. With integral values:With continuous values:
In
percentile_approx
you can pass an additional argument which determines a number of records to use.Adding a solution if you want an RDD method only and dont want to move to DF. This snippet can get you a percentile for an RDD of double.
If you input percentile as 50, you should obtain your required median. Let me know if there are any corner cases not accounted for.
Here is the method I used using window functions (with pyspark 2.2.0).
Then call the addMedian method to calculate the median of col2:
Finally you can group by if needed.
I have written the function which takes data frame as an input and returns a dataframe which has median as an output over a partition and order_col is the column for which we want to calculate median for part_col is the level at which we want to calculate median for :