I have data that looks like this:
userid,eventtime,location_point
4e191908,2017-06-04 03:00:00,18685891
4e191908,2017-06-04 03:04:00,18685891
3136afcb,2017-06-04 03:03:00,18382821
661212dd,2017-06-04 03:06:00,80831484
40e8a7c3,2017-06-04 03:12:00,18825769
I would like to add a new boolean column that marks true if there are 2 or moreuserid
within a 5 minutes window in the same location_point
. I had an idea of using lag
function to lookup over a window partitioned by the userid
and with the range between the current timestamp and the next 5 minutes:
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.functions import col
days = lambda i: i * 60*5
windowSpec = W.partitionBy(col("userid")).orderBy(col("eventtime").cast("timestamp").cast("long")).rangeBetween(0, days(5))
lastURN = F.lag(col("location_point"), 1).over(windowSpec)
visitCheck = (last_location_point == output.location_pont)
output.withColumn("visit_check", visitCheck).select("userid","eventtime", "location_pont", "visit_check")
This code is giving me an analysis exception when I use the RangeBetween function:
AnalysisException: u'Window Frame RANGE BETWEEN CURRENT ROW AND 1500 FOLLOWING must match the required frame ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING;
Do you know any way to tackle this problem?
Given your data:
Let's add a column with a timestamp in seconds:
Now, let's define a window function, with a partition by location_point, an order by timestamp and a range between -300s and current time. We can count the number of elements in this window and put these data in a column named 'occurences in_5_min':
Now you can add the desired column with True if the number of occurences is strictly more than 1 in the last 5 minutes on a particular location:
rangeBetween
just doesn't make sense for non-aggregate function likelag
.lag
takes always a specific row, denoted by offset argument, so specifying frame is pointless.To get a window over time series you can use
window
grouping with standard aggregates:You can add more arguments to modify slide duration.
You can try something similar with window functions if you partition by
location
: