Find median in spark SQL for multiple double datat

2019-02-17 10:33发布

问题:

I have a requirement to find median for multiple double datatype columns.Request suggestion to find the correct approach.

Below is my sample dataset with one column. I am expecting the median value to be returned as 1 for my sample.

  scala> sqlContext.sql("select num from test").show();
+---+
|num|
+---+
|0.0|
|0.0|
|1.0|
|1.0|
|1.0|
|1.0|
+---+

I tried the following options

1) Hive UDAF percentile, it worked only for BigInt.

2) Hive UDAT percentile_approx, but it does not work as expected (returns 0.25 vs 1).

sqlContext.sql("select percentile_approx(num,0.5) from test").show();

+----+
| _c0|
+----+
|0.25|
+----+

3) Spark window function percent_rank- to find median the way i see is to look for all percent_rank above 0.5 and pick the max percent_rank's corresponding num value. But it does not work in all cases, especially when i have even record counts, in such case the median is the average of the middle value in the sorted distribution.

Also in the percent_rank, as i have to find the median for multiple columns, i have to calculate it in different dataframes, which to me is little complex method. Please correct me, if my understanding is not right.

+---+-------------+
|num|percent_rank |
+---+-------------+
|0.0|0.0|
|0.0|0.0|
|1.0|0.4|
|1.0|0.4|
|1.0|0.4|
|1.0|0.4|
+---+---+

回答1:

Which version of Apache Spark are you using out of curiosity? There were some fixes within the Apache Spark 2.0+ which included changes to approxQuantile.

If I was to run the pySpark code snippet below:

rdd = sc.parallelize([[1, 0.0], [1, 0.0], [1, 1.0], [1, 1.0], [1, 1.0], [1, 1.0]])
df = rdd.toDF(['id', 'num'])
df.createOrReplaceTempView("df")

with the median calculation using approxQuantile as:

df.approxQuantile("num", [0.5], 0.25)

or

spark.sql("select percentile_approx(num, 0.5) from df").show()

the results are:

  • Spark 2.0.0: 0.25
  • Spark 2.0.1: 1.0
  • Spark 2.1.0: 1.0

Note, as these are the approximate numbers (via approxQuantile) though in general this should work well. If you need the exact median, one approach is to use numpy.median. The code snippet below is updated for this df example based on gench's SO response to How to find the median in Apache Spark with Python Dataframe API?:

from pyspark.sql.types import *
import pyspark.sql.functions as F
import numpy as np

def find_median(values):
    try:
        median = np.median(values) #get the median of values in a list in each row
        return round(float(median),2)
    except Exception:
        return None #if there is anything wrong with the given values

median_finder = F.udf(find_median,FloatType())

df2 = df.groupBy("id").agg(F.collect_list("num").alias("nums"))
df2 = df2.withColumn("median", median_finder("nums"))

# print out
df2.show()

with the output of:

+---+--------------------+------+
| id|                nums|median|
+---+--------------------+------+
|  1|[0.0, 0.0, 1.0, 1...|   1.0|
+---+--------------------+------+

Updated: Spark 1.6 Scala version using RDDs

If you are using Spark 1.6, you can calculate the median using Scala code via Eugene Zhulenev's response How can I calculate the exact median with Apache Spark. Below is the modified code that works with our example.

import org.apache.spark.SparkContext._

  val rdd: RDD[Double] = sc.parallelize(Seq((0.0), (0.0), (1.0), (1.0), (1.0), (1.0)))

  val sorted = rdd.sortBy(identity).zipWithIndex().map {
    case (v, idx) => (idx, v)
  }

  val count = sorted.count()

  val median: Double = if (count % 2 == 0) {
    val l = count / 2 - 1
    val r = l + 1
    (sorted.lookup(l).head + sorted.lookup(r).head).toDouble / 2
  } else sorted.lookup(count / 2).head.toDouble

with the output of:

// output
import org.apache.spark.SparkContext._
rdd: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[227] at parallelize at <console>:34
sorted: org.apache.spark.rdd.RDD[(Long, Double)] = MapPartitionsRDD[234] at map at <console>:36
count: Long = 6
median: Double = 1.0

Note, this is calculating the exact median using RDDs - i.e. you will need to convert the DataFrame column into an RDD to perform this calculation.