I have data which starts from 1st Jan 2017 to 7th Jan 2017 and it is a week wanted weekly aggregate. I used window function in following manner
val df_v_3 = df_v_2.groupBy(window(col("DateTime"), "7 day"))
.agg(sum("Value") as "aggregate_sum")
.select("window.start", "window.end", "aggregate_sum")
I am having data in dataframe as
DateTime,value
2017-01-01T00:00:00.000+05:30,1.2
2017-01-01T00:15:00.000+05:30,1.30
--
2017-01-07T23:30:00.000+05:30,1.43
2017-01-07T23:45:00.000+05:30,1.4
I am getting output as :
2016-12-29T05:30:00.000+05:30,2017-01-05T05:30:00.000+05:30,723.87
2017-01-05T05:30:00.000+05:30,2017-01-12T05:30:00.000+05:30,616.74
It shows that my day is starting from 29th Dec 2016 but in actual data is starting from 1 Jan 2017,why this margin is occuring?
For tumbling windows like this it is possible to set an offset to the starting time, more information can be found in the blog here. A sliding window is used, however, by setting both "window duration" and "sliding duration" to the same value, it will be the same as a tumbling window with starting offset.
The syntax is like follows,
window(column, window duration, sliding duration, starting offset)
With your values I found that an offset of 64 hours would give a starting time of 2017-01-01 00:00:00
.
val data = Seq(("2017-01-01 00:00:00",1.0),
("2017-01-01 00:15:00",2.0),
("2017-01-08 23:30:00",1.43))
val df = data.toDF("DateTime","value")
.withColumn("DateTime", to_timestamp($"DateTime", "yyyy-MM-dd HH:mm:ss"))
val df2 = df
.groupBy(window(col("DateTime"), "1 week", "1 week", "64 hours"))
.agg(sum("value") as "aggregate_sum")
.select("window.start", "window.end", "aggregate_sum")
Will give this resulting dataframe:
+-------------------+-------------------+-------------+
| start| end|aggregate_sum|
+-------------------+-------------------+-------------+
|2017-01-01 00:00:00|2017-01-08 00:00:00| 3.0|
|2017-01-08 00:00:00|2017-01-15 00:00:00| 1.43|
+-------------------+-------------------+-------------+
The solution with the python API looks a bit more intuitive since the window
function works with the following options:
window(timeColumn, windowDuration, slideDuration=None, startTime=None)
see:
https://spark.apache.org/docs/2.4.0/api/python/_modules/pyspark/sql/functions.html
The startTime is the offset with respect to 1970-01-01 00:00:00 UTC
with which to start window intervals. For example, in order to have
hourly tumbling windows that start 15 minutes past the hour, e.g.
12:15-13:15, 13:15-14:15... provide startTime
as 15 minutes
.
No need for a workaround with sliding duration
, I used a 3 days "delay" as startTime
to match the desired tumbling window:
from datetime import datetime
from pyspark.sql.functions import sum, window
df_ex = spark.createDataFrame([(datetime(2017,1,1, 0,0) , 1.), \
(datetime(2017,1,1,0,15) , 2.), \
(datetime(2017,1,8,23,30) , 1.43)], \
["Datetime", "value"])
weekly_ex = df_ex \
.groupBy(window("Datetime", "1 week", startTime="3 day" )) \
.agg(sum("value").alias('aggregate_sum'))
weekly_ex.show(truncate=False)
For the same result:
+------------------------------------------+-------------+
|window |aggregate_sum|
+------------------------------------------+-------------+
|[2017-01-01 00:00:00, 2017-01-08 00:00:00]|3.0 |
|[2017-01-08 00:00:00, 2017-01-15 00:00:00]|1.43 |
+------------------------------------------+-------------+