Structured streaming custom deduplication

2019-08-09 10:31发布

问题:

I have a streaming data coming in from kafka into dataFrame. I want to remove duplicates based in Id and keep the latest records based on timestamp.

Sample data is like this :

Id  Name    count   timestamp
1   Vikas   20      2018-09-19T10:10:10
2   Vijay   50      2018-09-19T10:10:20
3   Vilas   30      2018-09-19T10:10:30
4   Vishal  10      2018-09-19T10:10:40
1   Vikas   50      2018-09-19T10:10:50
4   Vishal  40      2018-09-19T10:11:00
1   Vikas   10      2018-09-19T10:11:10
3   Vilas   20      2018-09-19T10:11:20

The output that I am expecting would be :

Id  Name    count   timestamp
1   Vikas   10      2018-09-19T10:11:10
2   Vijay   50      2018-09-19T10:10:20
3   Vilas   20      2018-09-19T10:11:20
4   Vishal  40      2018-09-19T10:11:00

Older duplicates are removed and only the recent records are kept based on the timestamp field.

I am using watermarking for timestamp field. I have tried using "df.removeDuplicate" but it keeps older records intact and anything new gets discarded.

Current code is as follows :

df = df.withWatermark("timestamp", "1 Day").dropDuplicates("Id", "timestamp")

How can we implement custom dedup method so that we can keep latest record as unique record?

Any help is appreciated.

回答1:

Sort the timestamp column first before dropping the duplicates.

df.withWatermark("timestamp", "1 Day")
  .sort($"timestamp".desc)
  .dropDuplicates("Id", "timestamp")