Is there an example somewhere or can someone explain how to using Kinesis Analytics to construct real time sessions. (ie sessionization)
It is mentioned that this possible here: https://aws.amazon.com/blogs/aws/amazon-kinesis-analytics-process-streaming-data-in-real-time-with-sql/ in the discussion of custom windows but does not give an example.
Typically this is done in SQL using the LAG function so you can compute the time difference between consecutive rows. This post: https://blog.modeanalytics.com/finding-user-sessions-sql/ describes how to do it with conventional (non-streaming) SQL. However, I don't see support for the LAG function in Kinesis Analytics.
In particular I would love two examples. Assume that both take as input a stream consisting of a user_id and a timestamp. Define a session a sequence of events from the same user separated by less than 5 minutes
1) The first outputs a stream that has the additional columns event_count session_start_timestamp. Every time an event comes in this should output an event with these two additional columns.
2) The second example would be a stream that outputs a single event per session once the session has ended (ie 5 minutes have past with no data from a user). This event would have userId, start_timestamp, end_timestamp, and event_count
Is this possible with Kinesis Analytics?
Here is an example of doing this with Apache Spark: https://docs.cloud.databricks.com/docs/latest/databricks_guide/07%20Spark%20Streaming/Applications/01%20Sessionization.html
But I would love to do this with one (or two) Kinesis Analytics streams.
You can do this using Drools by creating the following logic:
Types:
package com.test;
import java.util.List;
declare EventA
@role( event )
userId:String;
seen:boolean;
end
declare SessionStart
userId: String;
timestamp: long;
events: List;
end
declare SessionEnd
userId: String;
timestamp: long;
numOfEvents: int;
end
declare SessionNotification
userId: String;
currentNumOfEvents: int;
end
Rules:
package com.test;
import java.util.List;
import java.util.ArrayList;
rule "Start session"
when
// for any EventA
$a : EventA() from entry-point events
// check session is not started for this userId
not (exists(SessionStart(userId == $a.userId)))
then
modify($a){setSeen(true);}
List events = new ArrayList();
events.add($a);
insert(new SessionStart($a.getUserId(), System.currentTimeMillis(), events));
end
rule "join session"
when
// for every new EventA
$a : EventA(seen == false) from entry-point events
// get event's session
$session: SessionStart(userId == $a.userId)
then
$session.getEvents().add($a);
insertLogical(new SessionNotification($a.getUserId(), $session.getEvents().size()));
modify($a) {setSeen(true);}
end
rule "End session"
// if session timed out, clean up first
salience 5
when
// for any EventA
$a : EventA() from entry-point events
// check there is no following EventA with same userId within 30 seconds
not (exists(EventA(this != $a, userId == $a.userId, this after[0, 30s] $a)
from entry-point events))
// get event's session
$session: SessionStart(userId == $a.userId)
then
insertLogical(new SessionEnd($a.getUserId(), System.currentTimeMillis(),
$session.getEvents().size()));
// cleanup
for (Object $x : $session.getEvents())
delete($x);
delete($session);
end
You can author Drools Kinesis Analytics with this service
There is support for LAG
now on Kinesis Analytics. You can see it on the documentation page http://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-lag.html. I have actually used it for a similar use case as the one you describe.
With the help of an AWS Solution Architect I was able to sessionize with this strategy:
Source stream sample:
epoc_time: INTEGER
uuid: CHAR(6)
epoc_time uuid
1530000000 myuuid
1530000001 myuuid
1530000002 myuuid
1530000003 myuuid
1530002000 myuuid
1530002001 myuuid
1530002002 myuuid
1530002003 myuuid
Step 1: Get the time difference between the current and preceding row and if that difference is greater than your session inactivity time requirement ( in my case ill choose 15 min / 900 seconds) stamp it.
CASE WHEN (epoc_time - lag(epoc_time,1) OVER (PARTITION BY uuid ROWS 1 PRECEDING)) > 900 THEN epoc_time
WHEN (epoc_time - lag(epoc_time,1) OVER (PARTITION BY uuid ROWS 1 PRECEDING)) IS NULL THEN epoc_time
ELSE NULL as session
epoc_time uuid session
1530000000 myuuid 1530000000
1530000001 myuuid
1530000002 myuuid
1530000003 myuuid
1530002000 myuuid 1530002000
1530002001 myuuid
1530002002 myuuid
1530002003 myuuid
Step 2: Grab the last value in the session column windowed by the uuid, combine it with the uuid to create a unique session. I chose the range as the default retention period for Kinesis (24 hours).
CAST(LAST_VALUE(session) IGNORE NULLS OVER (PARTITION BY uuid RANGE INTERVAL '24' HOUR PRECEDING) as CHAR(10))
|| uuid as sessionId,
epoc_time uuid session sessionId
1530000000 myuuid 1530000000 1530000000myuuid
1530000001 myuuid 1530000000myuuid
1530000002 myuuid 1530000000myuuid
1530000003 myuuid 1530000000myuuid
1530002000 myuuid 1530002000 1530002000myuuid
1530002001 myuuid 1530002000myuuid
1530002002 myuuid 1530002000myuuid
1530002003 myuuid 1530002000myuuid
Final SQL could look something like this:
CREATE OR REPLACE STREAM "INTERMEDIATE_SQL_STREAM" (
epoc_time INTEGER,
uuid CHAR(6),
session INTEGER
);
CREATE OR REPLACE STREAM "DESTINATION_STREAM" (
epoc_time INTEGER,
uuid CHAR(6),
sessionId CHAR(16)
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "INTERMEDIATE_SQL_STREAM"
SELECT STREAM
epoc_time,
uuid,
CASE WHEN (epoc_time - lag(epoc_time,1) OVER (PARTITION BY uuid ROWS 1 PRECEDING)) > 900 THEN epoc_time
WHEN (epoc_time - lag(epoc_time,1) OVER (PARTITION BY uuid ROWS 1 PRECEDING)) IS NULL THEN epoc_time
ELSE NULL
END as session
FROM "SOURCE_SQL_STREAM_001";
CREATE OR REPLACE PUMP "STREAM_PUMP2" AS INSERT INTO "DESTINATION_STREAM"
SELECT STREAM
epoc_time,
uuid,
CAST(LAST_VALUE(session) IGNORE NULLS OVER (PARTITION BY uuid RANGE INTERVAL '24' HOUR PRECEDING) as CHAR(10)) || uuid as sessionId
FROM "INTERMEDIATE_SQL_STREAM";