I have a DataFrame with the following structure:
| id | time | x | y |
-----------------------------
| 1 | 1 | 0 | 3 |
| 1 | 2 | 3 | 2 |
| 1 | 5 | 6 | 1 |
| 2 | 1 | 3 | 7 |
| 2 | 2 | 1 | 9 |
| 3 | 1 | 7 | 5 |
| 3 | 2 | 9 | 3 |
| 3 | 7 | 2 | 5 |
| 3 | 8 | 4 | 7 |
| 4 | 1 | 7 | 9 |
| 4 | 2 | 9 | 0 |
What I'm trying to achieve is for each record, three more columns are created containing the time, x, y
of the next one (based on time
). The catch is we only take the next records if they have the same id
value, otherwise the new three columns should be set to null
Here is the output that I'm trying to get
| id | time | x | y | time+1 | x+1 | y+1 |
--------------------------------------------------
| 1 | 1 | 0 | 3 | 2 | 3 | 2 |
| 1 | 2 | 3 | 2 | 5 | 6 | 1 |
| 1 | 5 | 6 | 1 | null | null| null|
| 2 | 1 | 3 | 7 | 2 | 1 | 9 |
| 2 | 2 | 1 | 9 | null | null| null|
| 3 | 1 | 7 | 5 | 2 | 9 | 3 |
| 3 | 2 | 9 | 3 | 7 | 2 | 5 |
| 3 | 7 | 2 | 5 | 8 | 4 | 7 |
| 3 | 8 | 4 | 7 | null | null| null|
| 4 | 1 | 7 | 9 | 2 | 9 | 0 |
| 4 | 2 | 9 | 0 | null | null| null|
Is it possible to achieve this using Spark DataFrames?
You can use window function lead.
First create window by partitioning using id column and then while calling withColumn function use column that you want to show with offset value as 1.
Something like this:
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('id).orderBy('time)
dataset.withColumn("time1", lead('time, 1) over windowSpec).show
You can add other columns by same way
If you are familiar with SQL, just create a temp view and create all the columns in one go. Check this out
scala> val df = Seq((1,1,0,3),(1,2,3,2),(1,5,6,1),(2,1,3,7),(2,2,1,9),(3,1,7,5),(3,2,9,3),(3,7,2,5),(3,8,4,7),(4,1,7,9),(4,2,9,0)).toDF("id","time","x","y")
df: org.apache.spark.sql.DataFrame = [id: int, time: int ... 2 more fields]
scala> df.createOrReplaceTempView("m2008")
scala> spark.sql(""" select *, lead(time) over(partition by id order by time) timep1,lead(x) over(partition by id order by time) xp1, lead(y) over(partition by id order by time) yp1 from m2008 """).show(false)
+---+----+---+---+------+----+----+
|id |time|x |y |timep1|xp1 |yp1 |
+---+----+---+---+------+----+----+
|1 |1 |0 |3 |2 |3 |2 |
|1 |2 |3 |2 |5 |6 |1 |
|1 |5 |6 |1 |null |null|null|
|3 |1 |7 |5 |2 |9 |3 |
|3 |2 |9 |3 |7 |2 |5 |
|3 |7 |2 |5 |8 |4 |7 |
|3 |8 |4 |7 |null |null|null|
|4 |1 |7 |9 |2 |9 |0 |
|4 |2 |9 |0 |null |null|null|
|2 |1 |3 |7 |2 |1 |9 |
|2 |2 |1 |9 |null |null|null|
+---+----+---+---+------+----+----+
scala>
You can get it back as another dataframe by just assigning the spark.sql result
scala> val df2 = spark.sql(""" select *, lead(time) over(partition by id order by time) timep1,lead(x) over(partition by id order by time) xp1, lead(y) over(partition by id order by time) yp1 from m2008 """)
df2: org.apache.spark.sql.DataFrame = [id: int, time: int ... 5 more fields]
scala> df2.printSchema
root
|-- id: integer (nullable = false)
|-- time: integer (nullable = false)
|-- x: integer (nullable = false)
|-- y: integer (nullable = false)
|-- timep1: integer (nullable = true)
|-- xp1: integer (nullable = true)
|-- yp1: integer (nullable = true)
scala>
In scala you can also do like this:
scala> import org.apache.spark.sql.expressions.Window
scala> val part= Window.partitionBy('id).orderBy('time)
scala> spark.read.format("csv").option("inferSchema","true").option("header",true).load("file:///home/ec2-user/test.csv").withColumn("time1", lead('time, 1) over part).withColumn("x+1", lead('x, 1) over part).withColumn("y+1", lead('y, 1) over part).show()
you can also check snapshot that i have atteched below: