Difference between two rows in Spark dataframe

2020-01-29 06:26发布

问题:

I created a dataframe in Spark, by groupby column1 and date and calculated the amount.

val table = df1.groupBy($"column1",$"date").sum("amount")
Column1 |Date   |Amount
A   |1-jul  |1000
A   |1-june |2000
A   |1-May  |2000
A   |1-dec  |3000
A   |1-Nov  |2000
B   |1-jul  |100
B   |1-june |300    
B   |1-May  |400
B   |1-dec  |300

Now, I want to add new column, with difference between amount of any two dates from the table.

回答1:

You can use Window function if the calculation is fixed as calculating difference between previous months, or calculating between previous two months ... etc. For that you can use lag and lead function with Window.

But for that you need to change the date column as below so that it can be ordered.

+-------+------+--------------+------+
|Column1|Date  |Date_Converted|Amount|
+-------+------+--------------+------+
|A      |1-jul |2017-07-01    |1000  |
|A      |1-june|2017-06-01    |2000  |
|A      |1-May |2017-05-01    |2000  |
|A      |1-dec |2017-12-01    |3000  |
|A      |1-Nov |2017-11-01    |2000  |
|B      |1-jul |2017-07-01    |100   |
|B      |1-june|2017-06-01    |300   |
|B      |1-May |2017-05-01    |400   |
|B      |1-dec |2017-12-01    |300   |
+-------+------+--------------+------+

You can find the difference between previous month and current month by doing

import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("Column1").orderBy("Date_Converted")
import org.apache.spark.sql.functions._
df.withColumn("diff_Amt_With_Prev_Month", $"Amount" - when((lag("Amount", 1).over(windowSpec)).isNull, 0).otherwise(lag("Amount", 1).over(windowSpec)))
   .show(false)

You should have

+-------+------+--------------+------+------------------------+
|Column1|Date  |Date_Converted|Amount|diff_Amt_With_Prev_Month|
+-------+------+--------------+------+------------------------+
|B      |1-May |2017-05-01    |400   |400.0                   |
|B      |1-june|2017-06-01    |300   |-100.0                  |
|B      |1-jul |2017-07-01    |100   |-200.0                  |
|B      |1-dec |2017-12-01    |300   |200.0                   |
|A      |1-May |2017-05-01    |2000  |2000.0                  |
|A      |1-june|2017-06-01    |2000  |0.0                     |
|A      |1-jul |2017-07-01    |1000  |-1000.0                 |
|A      |1-Nov |2017-11-01    |2000  |1000.0                  |
|A      |1-dec |2017-12-01    |3000  |1000.0                  |
+-------+------+--------------+------+------------------------+

You can increase the lagging position for previous two months as

df.withColumn("diff_Amt_With_Prev_two_Month", $"Amount" - when((lag("Amount", 2).over(windowSpec)).isNull, 0).otherwise(lag("Amount", 2).over(windowSpec)))
  .show(false)

which will give you

+-------+------+--------------+------+----------------------------+
|Column1|Date  |Date_Converted|Amount|diff_Amt_With_Prev_two_Month|
+-------+------+--------------+------+----------------------------+
|B      |1-May |2017-05-01    |400   |400.0                       |
|B      |1-june|2017-06-01    |300   |300.0                       |
|B      |1-jul |2017-07-01    |100   |-300.0                      |
|B      |1-dec |2017-12-01    |300   |0.0                         |
|A      |1-May |2017-05-01    |2000  |2000.0                      |
|A      |1-june|2017-06-01    |2000  |2000.0                      |
|A      |1-jul |2017-07-01    |1000  |-1000.0                     |
|A      |1-Nov |2017-11-01    |2000  |0.0                         |
|A      |1-dec |2017-12-01    |3000  |2000.0                      |
+-------+------+--------------+------+----------------------------+

I hope the answer is helpful



回答2:

Assumming those two dates belong to each group of your table

my imports :

import org.apache.spark.sql.functions.{concat_ws,collect_list,lit}

Perpare the dataframe

scala> val seqRow = Seq(
 | ("A","1- jul",1000),
 | ("A","1-june",2000),
 | ("A","1-May",2000),
 | ("A","1-dec",3000),
 | ("B","1-jul",100),
 | ("B","1-june",300),
 | ("B","1-May",400),
 | ("B","1-dec",300))

seqRow: Seq[(String, String, Int)] = List((A,1- jul,1000), (A,1-june,2000), (A,1-May,2000), (A,1-dec,3000), (B,1-jul,100), (B,1-june,300), (B,1-May,400), (B,1-dec,300))

scala> val input_df = sc.parallelize(seqRow).toDF("column1","date","amount")
input_df: org.apache.spark.sql.DataFrame = [column1: string, date: string ... 1 more field]

Now write a UDF for your case,

scala> def calc_diff = udf((list : Seq[String],startMonth : String,endMonth : String) => {
     |     //get the month and their values
     |     val monthMap = list.map{str =>
     |     val splitText = str.split("\\$")
     |     val month = splitText(0).split("-")(1).trim
     |
     |         (month.toLowerCase,splitText(1).toInt)
     |     }.toMap
     |
     |     val stMnth = monthMap(startMonth)
     |     val endMnth = monthMap(endMonth)
     |     endMnth - stMnth
     |
     | })
calc_diff: org.apache.spark.sql.expressions.UserDefinedFunction

Now, Preparing the output

scala> val (month1 : String,month2 : String) = ("jul","dec")
month1: String = jul
month2: String = dec

scala> val req_df = group_df.withColumn("diff",calc_diff('collect_val,lit(month1.toLowerCase),lit(month2.toLowerCase)))
req_df: org.apache.spark.sql.DataFrame = [column1: string, sum_amount: bigint ... 2 more fields]

scala> val req_df = group_df.withColumn("diff",calc_diff('collect_val,lit(month1.toLowerCase),lit(month2.toLowerCase))).drop('collect_val)
req_df: org.apache.spark.sql.DataFrame = [column1: string, sum_amount: bigint ... 1 more field]

scala> req_df.orderBy('column1).show
+-------+----------+----+
|column1|sum_amount|diff|
+-------+----------+----+
|      A|      8000|2000|
|      B|      1100| 200|
+-------+----------+----+

Hope, this is what you want.



回答3:

(table.filter($"Date".isin("1-jul", "1-dec"))
      .groupBy("Column1")
      .pivot("Date")
      .agg(first($"Amount"))
      .withColumn("diff", $"1-dec" - $"1-jul")
).show
+-------+-----+-----+----+
|Column1|1-dec|1-jul|diff|
+-------+-----+-----+----+
|      B|  300|  100| 200|
|      A| 3000| 1000|2000|
+-------+-----+-----+----+