Compare Value of Current and Previous Row in Spark

2020-05-29 01:02发布

问题:

I am trying to compare record of current and previous row in the below DataFrame. I want to calculate the Amount column.

scala> val dataset = sc.parallelize(Seq((1, 123, 50), (2, 456, 30), (3, 456, 70), (4, 789, 80))).toDF("SL_NO","ID","AMOUNT")

scala> dataset.show
+-----+---+------+
|SL_NO| ID|AMOUNT|
+-----+---+------+
|    1|123|    50|
|    2|456|    30|
|    3|456|    70|
|    4|789|    80|
+-----+---+------+

Calculation Logic:

  1. For the row no 1, AMOUNT should be 50 from first row.
  2. For the row no 2, if ID of SL_NO - 2 and 1 is not same then need to consider AMOUNT of SL_NO - 2 (i.e - 30). Otherwise AMOUNT of SL_NO - 1 (i.e. - 50)
  3. For the row no 3, if ID of SL_NO - 3 and 2 is not same then need to consider AMOUNT of SL_NO - 3 (i.e - 70). Otherwise AMOUNT of SL_NO - 2 (i.e. - 30)

Same logic need to follow for the other rows also.

Expected Output:

+-----+---+------+
|SL_NO| ID|AMOUNT|
+-----+---+------+
|    1|123|    50|
|    2|456|    30|
|    3|456|    30|
|    4|789|    80|
+-----+---+------+

Please help.

回答1:

You could use lag with when.otherwise, here is a demonstration:

import org.apache.spark.sql.expressions.Window

val w = Window.orderBy($"SL_NO")
dataset.withColumn("AMOUNT", 
    when($"ID" === lag($"ID", 1).over(w), lag($"AMOUNT", 1).over(w)).otherwise($"AMOUNT")
).show

+-----+---+------+
|SL_NO| ID|AMOUNT|
+-----+---+------+
|    1|123|    50|
|    2|456|    30|
|    3|456|    30|
|    4|789|    80|
+-----+---+------+

Note: since this example doesn't use any partition, it could have performance problem, in your real data, it would be helpful if your problem can be partitioned by some variables, may be Window.orderBy($"SL_NO").partitionBy($"ID") depending on your actual problem and whether IDs are sorted together.