How to drop duplicates using conditions [duplicate

2019-02-20 20:39发布

问题:

This question already has an answer here:

  • How to select the first row of each group? 8 answers

I have the following DataFrame df:

How can I delete duplicates, while keeping the minimum value of level per each duplicated pair of item_id and country_id.

+-----------+----------+---------------+                                        
|item_id    |country_id|level          |
+-----------+----------+---------------+
|     312330|  13535670|             82|
|     312330|  13535670|            369|
|     312330|  13535670|            376|
|     319840|  69731210|            127|
|     319840|  69730600|            526|
|     311480|  69628930|            150|
|     311480|  69628930|            138|
|     311480|  69628930|            405|
+-----------+----------+---------------+

The expected output:

+-----------+----------+---------------+                                        
|item_id    |country_id|level          |
+-----------+----------+---------------+
|     312330|  13535670|             82|
|     319840|  69731210|            127|
|     319840|  69730600|            526|
|     311480|  69628930|            138|
+-----------+----------+---------------+

I know how to delete duplicates without conditions using dropDuplicates, but I don't know how to do it for my particular case.

回答1:

One of the method is to use orderBy (default is ascending order), groupBy and aggregation first

import org.apache.spark.sql.functions.first
df.orderBy("level").groupBy("item_id", "country_id").agg(first("level").as("level")).show(false)

You can define the order as well by using .asc for ascending and .desc for descending as below

df.orderBy($"level".asc).groupBy("item_id", "country_id").agg(first("level").as("level")).show(false)

And you can do the operation using window and row_number function too as below

import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy("item_id", "country_id").orderBy($"level".asc)

import org.apache.spark.sql.functions.row_number
df.withColumn("rank", row_number().over(windowSpec)).filter($"rank" === 1).drop("rank").show()