Comparing the value of columns in two dataframe

2019-01-29 03:54发布

问题:

I have two dataframe, one has unique value of id and other can have multiple values of different id.

This is dataframe df1:

id | dt| speed | stats
358899055773504 2018-07-31 18:38:34 0 [9,-1,-1,13,0,1,0]
358899055773505 2018-07-31 18:48:23 4 [8,-1,0,22,1,1,1]

df2:

id | dt| speed | stats
358899055773504 2018-07-31 18:38:34 0 [9,-1,-1,13,0,1,0]
358899055773505 2018-07-31 18:54:23 4 [9,0,0,22,1,1,1]
358899055773504 2018-07-31 18:58:34 0 [9,0,-1,22,0,1,0]
358899055773504 2018-07-31 18:28:34 0 [9,0,-1,22,0,1,0]
358899055773505 2018-07-31 18:38:23 4 [8,-1,0,22,1,1,1]

I aim to compare the second dataframe with the first dataframe and updating the values in first dataframe, only if the value of dt of a particular id of df2 is greater than that in df1 and if it satisfies the greater than condition then comparing the other fields as well.

回答1:

You need to join the two dataframes together to make any comparison of their columns.

What you can do is first joining the dataframes and then perform all the filtering to get a new dataframe with all rows that should be updated:

val diffDf = df1.as("a").join(df2.as("b"), Seq("id"))
  .filter($"b.dt" > $"a.dt")
  .filter(...)                                          // Any other filter required
  .select($"id", $"b.dt", $"b.speed", $"b.stats")

Note: In some situations it would be required to do a groupBy(id) or use a window function since there should only be one final row per id in the diffDf dataframe. This can be done as as follows (the example here will select the row with maximum in the speed, but it depends on the actual requirements):

val w = Window.partitionBy($"id").orderBy($"speed".desc)
val diffDf2 = diffDf.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")

More in-depth information about different approaches can be seen here: How to max value and keep all columns (for max records per group)?.


To replace the old rows with the same id in the df1 dataframe, combine the dataframes with an outer join and coalesce:

val df = df1.as("a").join(diffDf.as("b"), Seq("id"), "outer")
  .select(
    $"id", 
    coalesce($"b.dt", $"a.dt").as("dt"), 
    coalesce($"b.speed", $"a.speed").as("speed"), 
    coalesce($"b.stats", $"a.stats").as("stats")
  )

coalesce works by first trying to take the value from the diffDf (b) dataframe. If that value is null it will take the value from df1 (a).

Result when only using the time filter with the provided example input dataframes:

+---------------+-------------------+-----+-----------------+
|             id|                 dt|speed|            stats|
+---------------+-------------------+-----+-----------------+
|358899055773504|2018-07-31 18:58:34|    0|[9,0,-1,22,0,1,0]|
|358899055773505|2018-07-31 18:54:23|    4| [9,0,0,22,1,1,1]|
+---------------+-------------------+-----+-----------------+