Aggregate a spark dataframe based on and before da

2019-08-17 00:55发布

问题:

I have a DataFrame with start_date column of date type. Now I have to generate metrics for unique values in column1 with start_date before or equal. Following is a input DataFrame

column1   column2  start_date
id1       val1     2018-03-12
id1       val2     2018-03-12
id2       val3     2018-03-12 
id3       val4     2018-03-12
id4       val5     2018-03-11
id4       val6     2018-03-11
id5       val7     2018-03-11
id5       val8     2018-03-11 
id6       val9     2018-03-10 

Now I have to convert into following,

start_date     count
2018-03-12    6
2018-03-11    3
2018-03-10    1 

This is what I am doing which is not efficient way,

  1. finding out all distinct start_dates and storing as a list
  2. looping over list and generating output for each start_date
  3. combining all outputs into one dataframe.

Is there a better way of doing it without looping ?

回答1:

You can combine standard aggregation with window function, but the second stage won't be distributed

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._


df
 .groupBy($"start_date")
 .agg(approx_count_distinct($"column1").alias("count"))
 .withColumn(
   "cumulative_count", sum($"count").over(Window.orderBy($"start_date")))


回答2:

Try Something like following -

groupBy("start_date").agg(countdistinct("column1"))

Explore in this pattern

Check countDistinct - https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.functions

use Spark Window - Example

val df = ... // schema => timestamp: TimestampType, stockId: StringType, price: DoubleType
df.groupBy(window($"time", "1 minute"), $"stockId")
  .agg(mean("price"))