Homemade DataFrame aggregation/dropDuplicates Spar

2019-08-27 17:09发布

问题:

I want to perform a transformation on my DataFrame df so that I only have each key once and only once in the final DataFrame.

For machine learning purposes, I don't want to have a bias in my dataset. This should never occur, but the data I get from my data source contains this "weirdness". So if I have lines with the same keys, I want to be able to chose either a combination of the two (like mean value) or a string concatenation (labels for example) or a random values set.

Say my DataFrame df looks like this:

+---+----+-----------+---------+
|ID1| ID2|       VAL1|     VAL2|
+---+----+-----------+---------+
|  A|   U|     PIERRE|        1|
|  A|   U|     THOMAS|        2|
|  A|   U|    MICHAEL|        3|
|  A|   V|        TOM|        2|
|  A|   V|       JACK|        3|
|  A|   W|     MICHEL|        2|
|  A|   W|     JULIEN|        3|
+---+----+-----------+---------+

I want my final DataFrame out to only keep one set of values per key, randomly. It could be another type of aggregation (say the concatenation of all values as a string) but I just don't want to build an Integer value from it, rather build new entries.

Eg. a final output could be (keeping only the first row per key):

+---+----+-----------+---------+
|ID1| ID2|       VAL1|     VAL2|
+---+----+-----------+---------+
|  A|   U|     PIERRE|        1|
|  A|   V|        TOM|        2|
|  A|   W|     MICHEL|        2|
+---+----+-----------+---------+

Another final output could be (keeping a random row per key):

+---+----+-----------+---------+
|ID1| ID2|       VAL1|     VAL2|
+---+----+-----------+---------+
|  A|   U|    MICHAEL|        3|
|  A|   V|       JACK|        3|
|  A|   W|     MICHEL|        2|
+---+----+-----------+---------+

Or, building a new set of values:

+---+----+--------------------------+----------+
|ID1| ID2|                      VAL1|      VAL2|
+---+----+--------------------------+----------+
|  A|   U| (PIERRE, THOMAS, MICHAEL)| (1, 2, 3)|
|  A|   V|               (TOM, JACK)|    (2, 3)|
|  A|   W|          (MICHEL, JULIEN)|    (2, 3)|
+---+----+--------------------------+----------+

The answer should use Spark with Scala. I also want to underline that the actual schema is way more complicated than that and I would like to reach a generic solution. Also, I do not want to fetch only unique values from one column but filter out lines that have same keys. Thanks!

EDIT This is what I tried to do (but Row.get(colname) throws a NoSuchElementException: key not found...):

  def myDropDuplicatesRandom(df: DataFrame, colnames: Seq[String]): DataFrame = {
    val fields_map: Map[String, (Int, DataType)] =
      df.schema.fieldNames.map(fname => {
        val findex = df.schema.fieldIndex(fname)
        val ftype = df.schema.fields(findex).dataType
        (fname, (findex, ftype))
      }).toMap[String, (Int, DataType)]

    df.sparkSession.createDataFrame(
      df.rdd
        .map[(String, Row)](r => (colnames.map(colname => r.get(fields_map(colname)._1).toString.replace("`", "")).reduceLeft((x, y) => "" + x + y), r))
        .groupByKey()
        .map{case (x: String, y: Iterable[Row]) => Utils.randomElement(y)}
    , df.schema)
  }

回答1:

Here's one approach:

val df = Seq(
  ("A", "U", "PIERRE", 1),
  ("A", "U", "THOMAS", 2),
  ("A", "U", "MICHAEL", 3),
  ("A", "V", "TOM", 2),
  ("A", "V", "JACK", 3),
  ("A", "W", "MICHEL", 2),
  ("A", "W", "JULIEN", 3)
).toDF("ID1", "ID2", "VAL1", "VAL2")

import org.apache.spark.sql.functions._

// Gather key/value column lists based on specific filtering criteria
val keyCols = df.columns.filter(_.startsWith("ID"))
val valCols = df.columns diff keyCols

// Group by keys to aggregate combined value-columns then re-expand
df.groupBy(keyCols.map(col): _*).
  agg(first(struct(valCols.map(col): _*)).as("VALS")).
  select($"ID1", $"ID2", $"VALS.*")

// +---+---+------+----+
// |ID1|ID2|  VAL1|VAL2|
// +---+---+------+----+
// |  A|  W|MICHEL|   2|
// |  A|  V|   TOM|   2|
// |  A|  U|PIERRE|   1|
// +---+---+------+----+

[UPDATE]

If I understand your expanded requirement correctly, you're looking for a generic way to transform dataframes by keys with an arbitrary agg function, like:

import org.apache.spark.sql.Column

def customAgg(keyCols: Seq[String], valCols: Seq[String], aggFcn: Column => Column) = {
  df.groupBy(keyCols.map(col): _*).
    agg(aggFcn(struct(valCols.map(col): _*)).as("VALS")).
    select($"ID1", $"ID2", $"VALS.*")
}

customAgg(keyCols, valCols, first)

I'd say that going down this path would result in very limited applicable agg functions. While the above works for first, you would have to implement differently for, say, collect_list/collect_set, etc. One can certainly hand-roll all the various types of agg functions, but it would likely result in unwarranted code maintenance hassle.



回答2:

You can use groupBy with first and struct as below

  import org.apache.spark.sql.functions._

  val d1 = spark.sparkContext.parallelize(Seq(
    ("A", "U", "PIERRE", 1),
    ("A", "U", "THOMAS", 2),
    ("A", "U", "MICHAEL", 3),
    ("A", "V", "TOM", 2),
    ("A", "V", "JACK", 3),
    ("A", "W", "MICHEL", 2),
    ("A", "W", "JULIEN", 3)
  )).toDF("ID1", "ID2", "VAL1", "VAL2")


  d1.groupBy("ID1", "ID2").agg(first(struct("VAL1", "VAL2")).as("val"))
    .select("ID1", "ID2", "val.*")
    .show(false)

UPDATE: If you have keys and values as a parameter then you can use as below.

val keys = Seq("ID1", "ID2")

val values = Seq("VAL1", "VAL2")

d1.groupBy(keys.head, keys.tail : _*)
    .agg(first(struct(values.head, values.tail:_*)).as("val"))
    .select( "val.*",keys:_*)
    .show(false)

Output:

+---+---+------+----+
|ID1|ID2|VAL1  |VAL2|
+---+---+------+----+
|A  |W  |MICHEL|2   |
|A  |V  |TOM   |2   |
|A  |U  |PIERRE|1   |
+---+---+------+----+

I hope this helps!