可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
I have a requirement to calculate exact median on grouped data set of Double datatype in Spark using Scala.
It is different from the similar query: Find median in spark SQL for multiple double datatype columns. This question is about the finding data for grouped data, whereas the other one is about finding median on a RDD level.
Here is my sample data
scala> sqlContext.sql("select * from test").show()
+---+---+
| id|num|
+---+---+
| A|0.0|
| A|1.0|
| A|1.0|
| A|1.0|
| A|0.0|
| A|1.0|
| B|0.0|
| B|1.0|
| B|1.0|
+---+---+
Expected Answer:
+--------+
| Median |
+--------+
| 1 |
| 1 |
+--------+
I tried the following option, but no luck:
1) Hive function percentile, it worked only for BigInt.
2) Hive function percentile_approx
, but it does not work as expected (returns 0.25 vs 1).
scala> sqlContext.sql("select percentile_approx(num, 0.5) from test group by id").show()
+----+
| _c0|
+----+
|0.25|
|0.25|
+----+
回答1:
Simplest Approach (requires Spark 2.0.1+ and not exact median)
As noted in the comments in reference to the first question Find median in Spark SQL for double datatype columns, we can use percentile_approx
to calculate median for Spark 2.0.1+. To apply this for grouped data in Apache Spark, the query would look like:
val df = Seq(("A", 0.0), ("A", 0.0), ("A", 1.0), ("A", 1.0), ("A", 1.0), ("A", 1.0), ("B", 0.0), ("B", 1.0), ("B", 1.0)).toDF("id", "num")
df.createOrReplaceTempView("df")
spark.sql("select id, percentile_approx(num, 0.5) as median from df group by id order by id").show()
with the output being:
+---+------+
| id|median|
+---+------+
| A| 1.0|
| B| 1.0|
+---+------+
Saying this, this is an approximate value (as opposed to an exact median per the question).
Calculate exact median for grouped data
There are multiple approaches so I'm sure others in SO can provide better or more efficient examples. But here's a code snippet calculate the median for grouped data in Spark (verified in Spark 1.6 and Spark 2.1):
import org.apache.spark.SparkContext._
val rdd: RDD[(String, Double)] = sc.parallelize(Seq(("A", 1.0), ("A", 0.0), ("A", 1.0), ("A", 1.0), ("A", 0.0), ("A", 1.0), ("B", 0.0), ("B", 1.0), ("B", 1.0)))
// Scala median function
def median(inputList: List[Double]): Double = {
val count = inputList.size
if (count % 2 == 0) {
val l = count / 2 - 1
val r = l + 1
(inputList(l) + inputList(r)).toDouble / 2
} else
inputList(count / 2).toDouble
}
// Sort the values
val setRDD = rdd.groupByKey()
val sortedListRDD = setRDD.mapValues(_.toList.sorted)
// Output DataFrame of id and median
sortedListRDD.map(m => {
(m._1, median(m._2))
}).toDF("id", "median_of_num").show()
with the output being:
+---+-------------+
| id|median_of_num|
+---+-------------+
| A| 1.0|
| B| 1.0|
+---+-------------+
There are some caveats that I should call out as this likely isn't the most efficient implementation:
- It's currently using a
groupByKey
which is not very performant. You may want to change this into a reduceByKey
instead (more information at Avoid GroupByKey)
- Using a Scala function to calculate the
median
.
This approach should work okay for smaller amounts of data but if you have millions of rows for each key, would advise utilizing Spark 2.0.1+ and using the percentile_approx
approach.
回答2:
Here is my version of PERCENTILE_COUNT function in SPARK. This can be used to find the value of median for a grouped data in Dataframe. Hope it may help someone. Feel free to provide your suggestions to improve the solution.
val PERCENTILEFLOOR = udf((maxrank: Integer, percentile: Double) => scala.math.floor(1 + (percentile * (maxrank - 1))))
val PERCENTILECEIL = udf((maxrank: Integer, percentile: Double) => scala.math.ceil(1 + (percentile * (maxrank - 1))))
val PERCENTILECALC = udf((maxrank: Integer, percentile: Double, floorVal: Double, ceilVal: Double, floorNum: Double, ceilNum: Double)
=> {
if (ceilNum == floorNum) {
floorVal
} else {
val RN = (1 + (percentile * (maxrank - 1)))
((ceilNum - RN) * floorVal) + ((RN - floorNum) * ceilVal)
} })
/** * The result of PERCENTILE_CONT is computed by linear interpolation between values after ordering them. * Using the percentile value (P) and the number of rows (N) in the aggregation group, * we compute the row number we are interested in after ordering the rows with respect to the sort specification. * This row number (RN) is computed according to the formula RN = (1+ (P*(N-1)). * The final result of the aggregate function is computed by linear interpolation between the values from rows at row numbers
* CRN = CEILING(RN) and FRN = FLOOR(RN). * * The final result will be: * * If (CRN = FRN = RN) then the result is * (value of expression from row at RN) * Otherwise the result is * (CRN - RN) * (value of expression for row at FRN) + * (RN - FRN) * (value of expression for row at CRN) * * Parameter details * * @inputDF - Dataframe for computation * @medianCol - Column for which percentile to be calculated * @grouplist - Group list for dataframe before sorting * @percentile - numeric value between 0 and 1 to express the percentile to be calculated * */
def percentile_count(inputDF: DataFrame, medianCol: String, groupList: List[String], percentile: Double): DataFrame = {
val orderList = List(medianCol)
val wSpec3 = Window.partitionBy(groupList.head, groupList.tail: _*).orderBy(orderList.head, orderList.tail: _*)
// Group, sort and rank the DF
val rankedDF = inputDF.withColumn("rank", row_number().over(wSpec3))
// Find the maximum for each group
val groupedMaxDF = rankedDF.groupBy(groupList.head, groupList.tail: _*).agg(max("rank").as("maxval"))
// CRN calculation
val ceilNumDF = groupedMaxDF.withColumn("rank", PERCENTILECEIL(groupedMaxDF("maxval"), lit(percentile))).drop("maxval")
// FRN calculation
val floorNumDF = groupedMaxDF.withColumn("rank", PERCENTILEFLOOR(groupedMaxDF("maxval"), lit(percentile)))
val ntileGroup = "rank" :: groupList
//Get the values for the CRN and FRN
val floorDF = floorNumDF.join(rankedDF, ntileGroup).withColumnRenamed("rank", "floorNum").withColumnRenamed(medianCol, "floorVal")
val ceilDF = ceilNumDF.join(rankedDF, ntileGroup).withColumnRenamed("rank", "ceilNum").withColumnRenamed(medianCol, "ceilVal")
//Get both the values for CRN and FRN in same row
val resultDF = floorDF.join(ceilDF, groupList)
val finalList = "median_" + medianCol :: groupList
// Calculate the median using the UDF PERCENTILECALC and returns the DF
resultDF.withColumn("median_" + medianCol, PERCENTILECALC(resultDF("maxval"), lit(percentile), resultDF("floorVal"), resultDF("ceilVal"), resultDF("floorNum"), resultDF("ceilNum"))).select(finalList.head, finalList.tail: _*)
}
回答3:
You can try this solution for exact median. I described spark sql solution here gist.github.
To compute exact median I use row_number() and count() functions in conjuction with a window function.
val data1 = Array( ("a", 0), ("a", 1), ("a", 1), ("a", 1), ("a", 0), ("a", 1))
val data2 = Array( ("b", 0), ("b", 1), ("b", 1))
val union = data1.union(data2)
val df = sc.parallelize(union).toDF("key", "val")
df.cache.createOrReplaceTempView("kvTable")
spark.sql("SET spark.sql.shuffle.partitions=2")
var ds = spark.sql("""
SELECT key, avg(val) as median
FROM ( SELECT key, val, rN, (CASE WHEN cN % 2 = 0 then (cN DIV 2) ELSE (cN DIV 2) + 1 end) as m1, (cN DIV 2) + 1 as m2
FROM (
SELECT key, val, row_number() OVER (PARTITION BY key ORDER BY val ) as rN, count(val) OVER (PARTITION BY key ) as cN
FROM kvTable
) s
) r
WHERE rN BETWEEN m1 and m2
GROUP BY key
""")
Spark executes and optimizes this query efficiently, since it reuses data partitioning.
scala> ds.show
+---+------+
|key|median|
+---+------+
| a| 1.0|
| b| 1.0|
+---+------+
回答4:
with high order function element_at
added in Spark 2.4. We could use with Window function, or groupBy then join back.
Sample Data
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
case class Salary(depName: String, empNo: Long, salary: Long)
val empsalary = Seq(
Salary("sales", 1, 5000),
Salary("personnel", 2, 3900),
Salary("sales", 3, 4800),
Salary("sales", 4, 4800),
Salary("personnel", 5, 3500),
Salary("develop", 7, 4200),
Salary("develop", 8, 6000),
Salary("develop", 9, 4500),
Salary("develop", 10, 5200),
Salary("develop", 11, 5200)).toDS
with Window function
val byDepName = Window.partitionBy('depName).orderBy('salary)
val df = empsalary.withColumn(
"salaries", collect_list('salary) over byDepName).withColumn(
"median_salary", element_at('salaries, (size('salaries)/2 + 1).cast("int")))
df.show(false)
with groupBy then join back
val dfMedian = empsalary.groupBy("depName").agg(
sort_array(collect_list('salary)).as("salaries")).select(
'depName,
element_at('salaries, (size('salaries)/2 + 1).cast("int")).as("median_salary"))
empsalary.join(dfMedian, "depName").show(false)
回答5:
If you don't wanna use spark-sql (as I do) you can use the cume_dist
function.
See example below:
import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.expressions.Window
val df = (1 to 10).toSeq.toDF
val win = Window.
partitionBy(F.col("value")).
orderBy(F.col("value")).
rangeBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("c", F.cume_dist().over(win)).show
Results:
+-----+---+
|value| c|
+-----+---+
| 1|0.1|
| 2|0.2|
| 3|0.3|
| 4|0.4|
| 5|0.5|
| 6|0.6|
| 7|0.7|
| 8|0.8|
| 9|0.9|
| 10|1.0|
+-----+---+
The median is the value for which df("c")
equals 0.5.
I hope it helps, Elior.
回答6:
Just to add to Elior's answer and responding to Erkan, the reason the output is 1.0 for each column is that the partitionBy(F.col("value")) partitions the data as a single row per partition such that when the window calculates cume_dist
it does it for a single value and results with 1.0.
Removing the partitionBy(F.col("value")) from the window operation results in the expected quantiles.
Start of Elior's Answer
If you don't wanna use spark-sql (as I do) you can use the cume_dist
function.
See example below:
import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.expressions.Window
val df = (1 to 10).toSeq.toDF
val win = Window.
partitionBy(F.col("value")). //Remove this line
orderBy(F.col("value")).
rangeBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("c", F.cume_dist().over(win)).show
Results:
+-----+---+
|value| c|
+-----+---+
| 1|0.1|
| 2|0.2|
| 3|0.3|
| 4|0.4|
| 5|0.5|
| 6|0.6|
| 7|0.7|
| 8|0.8|
| 9|0.9|
| 10|1.0|
+-----+---+
The median is the value for which df("c")
equals 0.5. I hope it helps, Elior.
End of Elior's Answer
Window defined without partitionBy:
val win = Window.
orderBy(F.col("value")).
rangeBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("c", F.cume_dist().over(win)).show