I have two dataframe, one has unique value of id
and other can have multiple values of different id
.
This is dataframe df1
:
id | dt| speed | stats
358899055773504 2018-07-31 18:38:34 0 [9,-1,-1,13,0,1,0]
358899055773505 2018-07-31 18:48:23 4 [8,-1,0,22,1,1,1]
df2
:
id | dt| speed | stats
358899055773504 2018-07-31 18:38:34 0 [9,-1,-1,13,0,1,0]
358899055773505 2018-07-31 18:54:23 4 [9,0,0,22,1,1,1]
358899055773504 2018-07-31 18:58:34 0 [9,0,-1,22,0,1,0]
358899055773504 2018-07-31 18:28:34 0 [9,0,-1,22,0,1,0]
358899055773505 2018-07-31 18:38:23 4 [8,-1,0,22,1,1,1]
I aim to compare the second dataframe with the first dataframe and updating the values in first dataframe, only if the value of dt
of a particular id
of df2
is greater than that in df1
and if it satisfies the greater than condition then comparing the other fields as well.
You need to join
the two dataframes together to make any comparison of their columns.
What you can do is first joining the dataframes and then perform all the filtering to get a new dataframe with all rows that should be updated:
val diffDf = df1.as("a").join(df2.as("b"), Seq("id"))
.filter($"b.dt" > $"a.dt")
.filter(...) // Any other filter required
.select($"id", $"b.dt", $"b.speed", $"b.stats")
Note: In some situations it would be required to do a groupBy(id)
or use a window function since there should only be one final row per id
in the diffDf
dataframe. This can be done as as follows (the example here will select the row with maximum in the speed, but it depends on the actual requirements):
val w = Window.partitionBy($"id").orderBy($"speed".desc)
val diffDf2 = diffDf.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")
More in-depth information about different approaches can be seen here: How to max value and keep all columns (for max records per group)?.
To replace the old rows with the same id
in the df1
dataframe, combine the dataframes with an outer join and coalesce
:
val df = df1.as("a").join(diffDf.as("b"), Seq("id"), "outer")
.select(
$"id",
coalesce($"b.dt", $"a.dt").as("dt"),
coalesce($"b.speed", $"a.speed").as("speed"),
coalesce($"b.stats", $"a.stats").as("stats")
)
coalesce
works by first trying to take the value from the diffDf
(b
) dataframe. If that value is null it will take the value from df1
(a
).
Result when only using the time filter with the provided example input dataframes:
+---------------+-------------------+-----+-----------------+
| id| dt|speed| stats|
+---------------+-------------------+-----+-----------------+
|358899055773504|2018-07-31 18:58:34| 0|[9,0,-1,22,0,1,0]|
|358899055773505|2018-07-31 18:54:23| 4| [9,0,0,22,1,1,1]|
+---------------+-------------------+-----+-----------------+