Drop duplicates for each partition

2019-03-06 07:24发布

问题:

origin data

cls, id  
----
a, 1
a, 1
----
b, 3
b, 3
b, 4

expected output

cls, id  
----
a, 1
----
b, 3
b, 4

id can be duplicates only in same cls, It means same id do not exist across clses.

In that case.

df.dropDuplicates($id) 

will shuffle across all partitions to check duplicates over cls. and repartitioned to 200(default value)

Now, How can I run dropDuplicates for each partition seperately to reduce computing cost?

something like

df.foreachPartition(_.dropDuplicates())

回答1:

You're probably after something like this:

val distinct = df.mapPartitions(it => {
    val set = Set();
    while (it.hasNext) {
        set += it.next()
    }
    return set.iterator
});