Spark DataFrames: Combining Two Consecutive Rows

2019-08-27 23:46发布

问题:

I have a DataFrame with the following structure:

|  id  |  time  |  x  |  y  |
-----------------------------
|  1   |   1    |  0  |  3  |
|  1   |   2    |  3  |  2  |
|  1   |   5    |  6  |  1  |
|  2   |   1    |  3  |  7  |
|  2   |   2    |  1  |  9  |
|  3   |   1    |  7  |  5  |
|  3   |   2    |  9  |  3  |
|  3   |   7    |  2  |  5  |
|  3   |   8    |  4  |  7  |
|  4   |   1    |  7  |  9  |
|  4   |   2    |  9  |  0  |

What I'm trying to achieve is for each record, three more columns are created containing the time, x, y of the next one (based on time). The catch is we only take the next records if they have the same id value, otherwise the new three columns should be set to null

Here is the output that I'm trying to get

|  id  |  time  |  x  |  y  | time+1 | x+1 | y+1 |
--------------------------------------------------
|  1   |   1    |  0  |  3  |   2    |  3  |  2  |
|  1   |   2    |  3  |  2  |   5    |  6  |  1  |
|  1   |   5    |  6  |  1  |  null  | null| null|
|  2   |   1    |  3  |  7  |   2    |  1  |  9  |
|  2   |   2    |  1  |  9  |  null  | null| null|
|  3   |   1    |  7  |  5  |   2    |  9  |  3  |
|  3   |   2    |  9  |  3  |   7    |  2  |  5  |
|  3   |   7    |  2  |  5  |   8    |  4  |  7  |
|  3   |   8    |  4  |  7  |  null  | null| null|
|  4   |   1    |  7  |  9  |   2    |  9  |  0  |
|  4   |   2    |  9  |  0  |  null  | null| null|

Is it possible to achieve this using Spark DataFrames?

回答1:

You can use window function lead. First create window by partitioning using id column and then while calling withColumn function use column that you want to show with offset value as 1.

Something like this:

import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('id).orderBy('time)
dataset.withColumn("time1", lead('time, 1) over windowSpec).show

You can add other columns by same way



回答2:

If you are familiar with SQL, just create a temp view and create all the columns in one go. Check this out

scala> val df = Seq((1,1,0,3),(1,2,3,2),(1,5,6,1),(2,1,3,7),(2,2,1,9),(3,1,7,5),(3,2,9,3),(3,7,2,5),(3,8,4,7),(4,1,7,9),(4,2,9,0)).toDF("id","time","x","y")
df: org.apache.spark.sql.DataFrame = [id: int, time: int ... 2 more fields]

scala> df.createOrReplaceTempView("m2008")

scala> spark.sql(""" select *, lead(time) over(partition by id order by time) timep1,lead(x) over(partition by id order by time) xp1, lead(y) over(partition by id order by time) yp1 from m2008 """).show(false)
+---+----+---+---+------+----+----+
|id |time|x  |y  |timep1|xp1 |yp1 |
+---+----+---+---+------+----+----+
|1  |1   |0  |3  |2     |3   |2   |
|1  |2   |3  |2  |5     |6   |1   |
|1  |5   |6  |1  |null  |null|null|
|3  |1   |7  |5  |2     |9   |3   |
|3  |2   |9  |3  |7     |2   |5   |
|3  |7   |2  |5  |8     |4   |7   |
|3  |8   |4  |7  |null  |null|null|
|4  |1   |7  |9  |2     |9   |0   |
|4  |2   |9  |0  |null  |null|null|
|2  |1   |3  |7  |2     |1   |9   |
|2  |2   |1  |9  |null  |null|null|
+---+----+---+---+------+----+----+


scala>

You can get it back as another dataframe by just assigning the spark.sql result

scala> val df2 = spark.sql(""" select *, lead(time) over(partition by id order by time) timep1,lead(x) over(partition by id order by time) xp1, lead(y) over(partition by id order by time) yp1 from m2008 """)
df2: org.apache.spark.sql.DataFrame = [id: int, time: int ... 5 more fields]

scala> df2.printSchema
root
 |-- id: integer (nullable = false)
 |-- time: integer (nullable = false)
 |-- x: integer (nullable = false)
 |-- y: integer (nullable = false)
 |-- timep1: integer (nullable = true)
 |-- xp1: integer (nullable = true)
 |-- yp1: integer (nullable = true)


scala>


回答3:

In scala you can also do like this:

scala> import org.apache.spark.sql.expressions.Window

scala> val part= Window.partitionBy('id).orderBy('time)

scala> spark.read.format("csv").option("inferSchema","true").option("header",true).load("file:///home/ec2-user/test.csv").withColumn("time1", lead('time, 1) over part).withColumn("x+1", lead('x, 1) over part).withColumn("y+1", lead('y, 1) over part).show()

you can also check snapshot that i have atteched below: