I have a table which contains records for customer purchases, I need to specify that purchase was made in specific datetime window one window is 8 days , so if I had purchase today and one in 5 days its mean my purchase if window number 1, but if I did it on day one today and next in 8 days, first purchase will be in window 1 and the last purchase in window 2
create temporary table transactions
(client_id int,
transaction_ts datetime,
store_id int)
insert into transactions values
(1,'2018-06-01 12:17:37', 1),
(1,'2018-06-02 13:17:37', 2),
(1,'2018-06-03 14:17:37', 3),
(1,'2018-06-09 10:17:37', 2),
(2,'2018-06-02 10:17:37', 1),
(2,'2018-06-02 13:17:37', 2),
(2,'2018-06-08 14:19:37', 3),
(2,'2018-06-16 13:17:37', 2),
(2,'2018-06-17 14:17:37', 3)
the window is 8 days, the problem is I don't understand how to specify for dense_rank() OVER (PARTITION BY) to look at datetime and make a window in 8 days,
as result I need something like this
1,'2018-06-01 12:17:37', 1,1
1,'2018-06-02 13:17:37', 2,1
1,'2018-06-03 14:17:37', 3,1
1,'2018-06-09 10:17:37', 2,2
2,'2018-06-02 10:17:37', 1,1
2,'2018-06-02 13:17:37', 2,1
2,'2018-06-08 14:19:37', 3,2
2,'2018-06-16 13:17:37', 2,3
2,'2018-06-17 14:17:37', 3,3
any idea how to get it? I can run it in Mysql or Spark SQL, but Mysql doesn't support partition.
Still cannot find solution! any help
Most likely you may solve this in Spark SQL using time and partition window functions:
val purchases = Seq((1,"2018-06-01 12:17:37", 1), (1,"2018-06-02 13:17:37", 2), (1,"2018-06-03 14:17:37", 3), (1,"2018-06-09 10:17:37", 2), (2,"2018-06-02 10:17:37", 1), (2,"2018-06-02 13:17:37", 2), (2,"2018-06-08 14:19:37", 3), (2,"2018-06-16 13:17:37", 2), (2,"2018-06-17 14:17:37", 3)).toDF("client_id", "transaction_ts", "store_id")
purchases.show(false)
+---------+-------------------+--------+
|client_id|transaction_ts |store_id|
+---------+-------------------+--------+
|1 |2018-06-01 12:17:37|1 |
|1 |2018-06-02 13:17:37|2 |
|1 |2018-06-03 14:17:37|3 |
|1 |2018-06-09 10:17:37|2 |
|2 |2018-06-02 10:17:37|1 |
|2 |2018-06-02 13:17:37|2 |
|2 |2018-06-08 14:19:37|3 |
|2 |2018-06-16 13:17:37|2 |
|2 |2018-06-17 14:17:37|3 |
+---------+-------------------+--------+
val groupedByTimeWindow = purchases.groupBy($"client_id", window($"transaction_ts", "8 days")).agg(collect_list("transaction_ts").as("transaction_tss"), collect_list("store_id").as("store_ids"))
val withWindowNumber = groupedByTimeWindow.withColumn("window_number", row_number().over(windowByClient))
withWindowNumber.orderBy("client_id", "window.start").show(false)
+---------+---------------------------------------------+---------------------------------------------------------------+---------+-------------+
|client_id|window |transaction_tss |store_ids|window_number|
+---------+---------------------------------------------+---------------------------------------------------------------+---------+-------------+
|1 |[2018-05-28 17:00:00.0,2018-06-05 17:00:00.0]|[2018-06-01 12:17:37, 2018-06-02 13:17:37, 2018-06-03 14:17:37]|[1, 2, 3]|1 |
|1 |[2018-06-05 17:00:00.0,2018-06-13 17:00:00.0]|[2018-06-09 10:17:37] |[2] |2 |
|2 |[2018-05-28 17:00:00.0,2018-06-05 17:00:00.0]|[2018-06-02 10:17:37, 2018-06-02 13:17:37] |[1, 2] |1 |
|2 |[2018-06-05 17:00:00.0,2018-06-13 17:00:00.0]|[2018-06-08 14:19:37] |[3] |2 |
|2 |[2018-06-13 17:00:00.0,2018-06-21 17:00:00.0]|[2018-06-16 13:17:37, 2018-06-17 14:17:37] |[2, 3] |3 |
+---------+---------------------------------------------+---------------------------------------------------------------+---------+-------------+
If you need, you may explode
list elements from store_ids or transaction_tss.
hope it helps!
I didnt use spark solution proposed , i did this with pure sql logic and cursor. its not very efficient but i need to job be done