Removing duplicates from rows based on specific co

2020-01-29 04:05发布

Let's say I have a rather large dataset in the following form:

data = sc.parallelize([('Foo',41,'US',3),
                       ('Foo',39,'UK',1),
                       ('Bar',57,'CA',2),
                       ('Bar',72,'CA',2),
                       ('Baz',22,'US',6),
                       ('Baz',36,'US',6)])

What I would like to do is remove duplicate rows based on the values of the first,third and fourth columns only.

Removing entirely duplicate rows is straightforward:

data = data.distinct()

and either row 5 or row 6 will be removed

But how do I only remove duplicate rows based on columns 1, 3 and 4 only? i.e. remove either one one of these:

('Baz',22,'US',6)
('Baz',36,'US',6)

In Python, this could be done by specifying columns with .drop_duplicates(). How can I achieve the same in Spark/Pyspark?

7条回答
霸刀☆藐视天下
2楼-- · 2020-01-29 04:13

Agree with David. To add on, it may not be the case that we want to groupBy all columns other than the column(s) in aggregate function i.e, if we want to remove duplicates purely based on a subset of columns and retain all columns in the original dataframe. So the better way to do this could be using dropDuplicates Dataframe api available in Spark 1.4.0

For reference, see: https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrame

查看更多
何必那么认真
3楼-- · 2020-01-29 04:13

I used inbuilt function dropDuplicates(). Scala code given below

val data = sc.parallelize(List(("Foo",41,"US",3),
("Foo",39,"UK",1),
("Bar",57,"CA",2),
("Bar",72,"CA",2),
("Baz",22,"US",6),
("Baz",36,"US",6))).toDF("x","y","z","count")

data.dropDuplicates(Array("x","count")).show()

Output :

+---+---+---+-----+
|  x|  y|  z|count|
+---+---+---+-----+
|Baz| 22| US|    6|
|Foo| 39| UK|    1|
|Foo| 41| US|    3|
|Bar| 57| CA|    2|
+---+---+---+-----+
查看更多
够拽才男人
4楼-- · 2020-01-29 04:17

Pyspark does include a dropDuplicates() method. https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates

>>> from pyspark.sql import Row
>>> df = sc.parallelize([ \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+

>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+

Maybe it was introduced in a later version than what @Jason (OP) was using?

edit: yeah it was introduced in 1.4

查看更多
混吃等死
5楼-- · 2020-01-29 04:25

This is my Df contain 4 is repeated twice so here will remove repeated values.

scala> df.show
+-----+
|value|
+-----+
|    1|
|    4|
|    3|
|    5|
|    4|
|   18|
+-----+

scala> val newdf=df.dropDuplicates

scala> newdf.show
+-----+
|value|
+-----+
|    1|
|    3|
|    5|
|    4|
|   18|
+-----+
查看更多
ら.Afraid
6楼-- · 2020-01-29 04:26

I know you already accepted the other answer, but if you want to do this as a DataFrame, just use groupBy and agg. Assuming you had a DF already created (with columns named "col1", "col2", etc) you could do:

myDF.groupBy($"col1", $"col3", $"col4").agg($"col1", max($"col2"), $"col3", $"col4")

Note that in this case, I chose the Max of col2, but you could do avg, min, etc.

查看更多
看我几分像从前
7楼-- · 2020-01-29 04:27

From your question, it is unclear as-to which columns you want to use to determine duplicates. The general idea behind the solution is to create a key based on the values of the columns that identify duplicates. Then, you can use the reduceByKey or reduce operations to eliminate duplicates.

Here is some code to get you started:

def get_key(x):
    return "{0}{1}{2}".format(x[0],x[2],x[3])

m = data.map(lambda x: (get_key(x),x))

Now, you have a key-value RDD that is keyed by columns 1,3 and 4. The next step would be either a reduceByKey or groupByKey and filter. This would eliminate duplicates.

r = m.reduceByKey(lambda x,y: (x))
查看更多
登录 后发表回答