PySpark - Combining Session Data without Explicit

2019-08-09 05:45发布

问题:

I am trying to aggregate session data without a true session "key" in PySpark. I have data where an individual is detected in an area at a specific time, and I want to aggregate that into a duration spent in each area during a specific visit (see below).

The tricky part here is that I want to infer the time someone exits each area as the time they are detected in the next area. This means that I will need to use the start time of the next area ID as the end time for any given area ID. Area IDs can also show up more than once for the same individual.

I had an implementation of this in MapReduce where I iterate over all rows and aggregate the time until a new AreaID or Individual is detected, then output the record. Is there a way to do something similar in Spark? Is there a better way to approach the problem?

Also of note, I do not want to output a record unless the individual has been detected in another area (e.g. IndividualY, AreaT below)

I have a dataset in the following format:

Individual  AreaID  Datetime of Detection
IndividualX AreaQ   1/7/2015 0:00
IndividualX AreaQ   1/7/2015 1:00
IndividualX AreaW   1/7/2015 3:00
IndividualX AreaQ   1/7/2015 4:00
IndividualY AreaZ   2/7/2015 4:00
IndividualY AreaZ   2/7/2015 5:00
IndividualY AreaW   2/7/2015 6:00
IndividualY AreaT   2/7/2015 7:00

I would like the desired output of:

Individual  AreaID  Start_Time      End_Time        Duration (minutes)
IndividualX AreaQ   1/7/2015 0:00   1/7/2015 3:00   180
IndividualX AreaW   1/7/2015 3:00   1/7/2015 4:00   60
IndividualY AreaZ   2/7/2015 4:00   2/7/2015 6:00   120
IndividualY AreaW   2/7/2015 6:00   2/7/2015 7:00   60

回答1:

It is particularly pretty solution but you can use DataFrames and window functions. Assuming your input looks like this:

rdd = sc.parallelize([
    ("IndividualX", "AreaQ",  "1/7/2015 0:00"),
    ("IndividualX", "AreaQ",  "1/7/2015 1:00"),
    ("IndividualX", "AreaW",  "1/7/2015 3:00"),
    ("IndividualX", "AreaQ",  "1/7/2015 4:00"),
    ("IndividualY", "AreaZ",  "2/7/2015 4:00"),
    ("IndividualY", "AreaZ",  "2/7/2015 5:00"),
    ("IndividualY", "AreaW",  "2/7/2015 6:00"),
    ("IndividualY", "AreaT",  "2/7/2015 7:00")
])

First we have to convert it to a DataFrame:

from datetime import datetime
from pyspark.sql import Row
from pyspark.sql import HiveContext

sqlContext = HiveContext(sc)

row = Row("individual", "area_id", "datetime")
fmt = "%d/%m/%Y %H:%M"
df = rdd.map(lambda r: row(r[0], r[1], datetime.strptime(r[2], fmt))).toDF()

Next lets define a window:

from pyspark.sql import functions as f
from pyspark.sql.window import Window

w = Window().partitionBy("individual").orderBy("datetime")

And temporary columns:

p_area_id = f.lag("area_id").over(w) # Previous area

ind =  f.sum((
    p_area_id.isNull() | # No previous observation
    (p_area_id != f.col("area_id")) # Area changed
).cast("integer")).over(w)

Using indicator defined above we can choose minimum timestamp for visit in the area:

tmp = (df
   .withColumn("ind", ind)
   .groupBy("individual", "area_id", "ind")
   .agg(f.min("datetime").alias("datetime"))
   .drop("ind"))

Finally we can define target columns:

end_time = f.lead(f.col("datetime")).over(w)

duration = (
    f.col("end_time").cast("integer") - f.col("datetime").cast("integer")) / 60

and build output DataFrame:

result = (tmp
    .withColumn("end_time", end_time)
    .where(f.col("end_time").isNotNull())
    .withColumn("duration", duration)
    .withColumnRenamed("datetime", "start_time"))

And output:

+-----------+-------+--------------------+--------------------+--------+
| individual|area_id|          start_time|            end_time|duration|
+-----------+-------+--------------------+--------------------+--------+
|IndividualX|  AreaQ|2015-07-01 00:00:...|2015-07-01 03:00:...|   180.0|
|IndividualX|  AreaW|2015-07-01 03:00:...|2015-07-01 04:00:...|    60.0|
|IndividualY|  AreaZ|2015-07-02 04:00:...|2015-07-02 06:00:...|   120.0|
|IndividualY|  AreaW|2015-07-02 06:00:...|2015-07-02 07:00:...|    60.0|
+-----------+-------+--------------------+--------------------+--------+

If you prefer plain RDDs you can reshape to something like this:

(individual, (area_id, datetime))

and next groupByKey and perform required operations locally.



回答2:

Zero323's solution works great but wanted to post an rdd implementation as well. I think this will be helpful for people trying to translate streaming MapReduce to pyspark. My implementation basically maps keys (individuals in this case) to a list of list for the streaming values that would associate with that key (areas and times) and then iterates over the list to satisfy the iterative component - and the rest is just normal reducing by keys and mapping.


    from pyspark import SparkContext, SparkFiles, SparkConf
    from datetime import datetime

    conf = SparkConf()
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize(["IndividualX|AreaQ|1/7/2015 0:00",
                          "IndividualX|AreaQ|1/7/2015 1:00",
                          "IndividualX|AreaW|1/7/2015 3:00",
                          "IndividualX|AreaQ|1/7/2015 4:00",
                          "IndividualY|AreaZ|2/7/2015 4:00",
                          "IndividualY|AreaZ|2/7/2015 5:00",
                          "IndividualY|AreaW|2/7/2015 6:00",
                          "IndividualY|AreaT|2/7/2015 7:00"])

    def splitReduce(x):
        y = x.split('|')
        return (str(y[0]),[[str(y[2]),str(y[1])]])

    def resultSet(x):

        processlist = sorted(x[1], key=lambda x: x[0])

        result = []
        start_area = processlist[0][1]
        start_date = datetime.strptime(processlist[0][0], '%d/%m/%Y %H:%M')
        dur = 0

        if len(processlist) > 1:

            for datearea in processlist[1::]:

                end_date = datetime.strptime(datearea[0],'%d/%m/%Y %H:%M')
                end_area = datearea[1]

                dur = (end_date-start_date).total_seconds()/60

                if start_area != end_area:
                    result.append([start_area,start_date,end_date,dur])
                    start_date = datetime.strptime(datearea[0], '%d/%m/%Y %H:%M')
                    start_area = datearea[1]
                    dur = 0

        return (x[0],result)

    def finalOut(x):
        return str(x[0]) + '|' + str(x[1][0]) + '|' + str(x[1][1]) + '|' + str(x[1][2]) + '|' + str(x[1][3])

    footfall = rdd\
    .map(lambda x: splitReduce(x))\
    .reduceByKey(lambda a, b: a + b)\
    .map(lambda x: resultSet(x))\
    .flatMapValues(lambda x: x)\
    .map(lambda x: finalOut(x))\
    .collect()

    print footfall

Provides output of:

['IndividualX|AreaQ|2015-07-01 00:00:00|2015-07-01 03:00:00|180.0',
'IndividualX|AreaW|2015-07-01 03:00:00|2015-07-01 04:00:00|60.0',
'IndividualY|AreaZ|2015-07-02 04:00:00|2015-07-02 06:00:00|120.0',
'IndividualY|AreaW|2015-07-02 06:00:00|2015-07-02 07:00:00|60.0']