自制据帧聚合/ dropDuplicates星火(Homemade DataFrame aggreg

2019-11-04 15:38发布

我想在我的数据帧进行改造df所以,我只在最后的数据帧都有每个键一次且仅一次。

对于机器学习的目的,我不希望在我的数据集的偏向。 这应该不会发生,但我从我的数据源获取数据中包含这种“怪事”。 所以,如果我有相同的键线,我希望能够选择一个或者两个(如平均值)的组合或字符串连接(标签为例)或设置一个随机值。

说我的数据帧df看起来是这样的:

+---+----+-----------+---------+
|ID1| ID2|       VAL1|     VAL2|
+---+----+-----------+---------+
|  A|   U|     PIERRE|        1|
|  A|   U|     THOMAS|        2|
|  A|   U|    MICHAEL|        3|
|  A|   V|        TOM|        2|
|  A|   V|       JACK|        3|
|  A|   W|     MICHEL|        2|
|  A|   W|     JULIEN|        3|
+---+----+-----------+---------+

我希望我的最后数据帧out到只保留一组每个键值,随机。 这可能是另一种类型的聚合(说所有值的串联为一个字符串),但我只是不希望从它建立一个整数值,而不是建立新的条目。

例如。 最终的输出可以是(仅保留每个键的第一行):

+---+----+-----------+---------+
|ID1| ID2|       VAL1|     VAL2|
+---+----+-----------+---------+
|  A|   U|     PIERRE|        1|
|  A|   V|        TOM|        2|
|  A|   W|     MICHEL|        2|
+---+----+-----------+---------+

另一个最终输出可能是(保持每个键排随机):

+---+----+-----------+---------+
|ID1| ID2|       VAL1|     VAL2|
+---+----+-----------+---------+
|  A|   U|    MICHAEL|        3|
|  A|   V|       JACK|        3|
|  A|   W|     MICHEL|        2|
+---+----+-----------+---------+

或者,建立一套新的价值观:

+---+----+--------------------------+----------+
|ID1| ID2|                      VAL1|      VAL2|
+---+----+--------------------------+----------+
|  A|   U| (PIERRE, THOMAS, MICHAEL)| (1, 2, 3)|
|  A|   V|               (TOM, JACK)|    (2, 3)|
|  A|   W|          (MICHEL, JULIEN)|    (2, 3)|
+---+----+--------------------------+----------+

答案应该使用星火使用Scala。 我也想强调的是,实际模式比这样更复杂,我想达到一个通用的解决方案。 另外,我不想从一列取唯一的值,但筛选出具有相同的键线。 谢谢!

编辑这是我试图这样做(但Row.get(colname)抛出一个NoSuchElementException: key not found... ):

  def myDropDuplicatesRandom(df: DataFrame, colnames: Seq[String]): DataFrame = {
    val fields_map: Map[String, (Int, DataType)] =
      df.schema.fieldNames.map(fname => {
        val findex = df.schema.fieldIndex(fname)
        val ftype = df.schema.fields(findex).dataType
        (fname, (findex, ftype))
      }).toMap[String, (Int, DataType)]

    df.sparkSession.createDataFrame(
      df.rdd
        .map[(String, Row)](r => (colnames.map(colname => r.get(fields_map(colname)._1).toString.replace("`", "")).reduceLeft((x, y) => "" + x + y), r))
        .groupByKey()
        .map{case (x: String, y: Iterable[Row]) => Utils.randomElement(y)}
    , df.schema)
  }

Answer 1:

这里有一个方法:

val df = Seq(
  ("A", "U", "PIERRE", 1),
  ("A", "U", "THOMAS", 2),
  ("A", "U", "MICHAEL", 3),
  ("A", "V", "TOM", 2),
  ("A", "V", "JACK", 3),
  ("A", "W", "MICHEL", 2),
  ("A", "W", "JULIEN", 3)
).toDF("ID1", "ID2", "VAL1", "VAL2")

import org.apache.spark.sql.functions._

// Gather key/value column lists based on specific filtering criteria
val keyCols = df.columns.filter(_.startsWith("ID"))
val valCols = df.columns diff keyCols

// Group by keys to aggregate combined value-columns then re-expand
df.groupBy(keyCols.map(col): _*).
  agg(first(struct(valCols.map(col): _*)).as("VALS")).
  select($"ID1", $"ID2", $"VALS.*")

// +---+---+------+----+
// |ID1|ID2|  VAL1|VAL2|
// +---+---+------+----+
// |  A|  W|MICHEL|   2|
// |  A|  V|   TOM|   2|
// |  A|  U|PIERRE|   1|
// +---+---+------+----+

[UPDATE]

如果我正确理解你的需求扩大,你正在寻找一种通用的方式通过键与任意变换dataframes agg的功能,如:

import org.apache.spark.sql.Column

def customAgg(keyCols: Seq[String], valCols: Seq[String], aggFcn: Column => Column) = {
  df.groupBy(keyCols.map(col): _*).
    agg(aggFcn(struct(valCols.map(col): _*)).as("VALS")).
    select($"ID1", $"ID2", $"VALS.*")
}

customAgg(keyCols, valCols, first)

我会说,走这条路将导致非常有限的适用agg功能。 虽然上述作品first ,你必须执行不同的,比如说, collect_list/collect_set等人们肯定能够手工卷的所有各类agg功能,但它可能会导致不必要的代码维护的麻烦。



Answer 2:

您可以使用groupByfirststruct如下

  import org.apache.spark.sql.functions._

  val d1 = spark.sparkContext.parallelize(Seq(
    ("A", "U", "PIERRE", 1),
    ("A", "U", "THOMAS", 2),
    ("A", "U", "MICHAEL", 3),
    ("A", "V", "TOM", 2),
    ("A", "V", "JACK", 3),
    ("A", "W", "MICHEL", 2),
    ("A", "W", "JULIEN", 3)
  )).toDF("ID1", "ID2", "VAL1", "VAL2")


  d1.groupBy("ID1", "ID2").agg(first(struct("VAL1", "VAL2")).as("val"))
    .select("ID1", "ID2", "val.*")
    .show(false)

更新:如果你有键和值作为参数,那么你可以按照以下使用。

val keys = Seq("ID1", "ID2")

val values = Seq("VAL1", "VAL2")

d1.groupBy(keys.head, keys.tail : _*)
    .agg(first(struct(values.head, values.tail:_*)).as("val"))
    .select( "val.*",keys:_*)
    .show(false)

输出:

+---+---+------+----+
|ID1|ID2|VAL1  |VAL2|
+---+---+------+----+
|A  |W  |MICHEL|2   |
|A  |V  |TOM   |2   |
|A  |U  |PIERRE|1   |
+---+---+------+----+

我希望这有帮助!



文章来源: Homemade DataFrame aggregation/dropDuplicates Spark