I want to perform a transformation on my DataFrame df
so that I only have each key once and only once in the final DataFrame.
For machine learning purposes, I don't want to have a bias in my dataset. This should never occur, but the data I get from my data source contains this "weirdness". So if I have lines with the same keys, I want to be able to chose either a combination of the two (like mean value) or a string concatenation (labels for example) or a random values set.
Say my DataFrame df
looks like this:
+---+----+-----------+---------+
|ID1| ID2| VAL1| VAL2|
+---+----+-----------+---------+
| A| U| PIERRE| 1|
| A| U| THOMAS| 2|
| A| U| MICHAEL| 3|
| A| V| TOM| 2|
| A| V| JACK| 3|
| A| W| MICHEL| 2|
| A| W| JULIEN| 3|
+---+----+-----------+---------+
I want my final DataFrame out
to only keep one set of values per key, randomly. It could be another type of aggregation (say the concatenation of all values as a string) but I just don't want to build an Integer value from it, rather build new entries.
Eg. a final output could be (keeping only the first row per key):
+---+----+-----------+---------+
|ID1| ID2| VAL1| VAL2|
+---+----+-----------+---------+
| A| U| PIERRE| 1|
| A| V| TOM| 2|
| A| W| MICHEL| 2|
+---+----+-----------+---------+
Another final output could be (keeping a random row per key):
+---+----+-----------+---------+
|ID1| ID2| VAL1| VAL2|
+---+----+-----------+---------+
| A| U| MICHAEL| 3|
| A| V| JACK| 3|
| A| W| MICHEL| 2|
+---+----+-----------+---------+
Or, building a new set of values:
+---+----+--------------------------+----------+
|ID1| ID2| VAL1| VAL2|
+---+----+--------------------------+----------+
| A| U| (PIERRE, THOMAS, MICHAEL)| (1, 2, 3)|
| A| V| (TOM, JACK)| (2, 3)|
| A| W| (MICHEL, JULIEN)| (2, 3)|
+---+----+--------------------------+----------+
The answer should use Spark with Scala. I also want to underline that the actual schema is way more complicated than that and I would like to reach a generic solution. Also, I do not want to fetch only unique values from one column but filter out lines that have same keys. Thanks!
EDIT This is what I tried to do (but Row.get(colname)
throws a NoSuchElementException: key not found...
):
def myDropDuplicatesRandom(df: DataFrame, colnames: Seq[String]): DataFrame = {
val fields_map: Map[String, (Int, DataType)] =
df.schema.fieldNames.map(fname => {
val findex = df.schema.fieldIndex(fname)
val ftype = df.schema.fields(findex).dataType
(fname, (findex, ftype))
}).toMap[String, (Int, DataType)]
df.sparkSession.createDataFrame(
df.rdd
.map[(String, Row)](r => (colnames.map(colname => r.get(fields_map(colname)._1).toString.replace("`", "")).reduceLeft((x, y) => "" + x + y), r))
.groupByKey()
.map{case (x: String, y: Iterable[Row]) => Utils.randomElement(y)}
, df.schema)
}
You can use
groupBy
withfirst
andstruct
as belowUPDATE: If you have keys and values as a parameter then you can use as below.
Output:
I hope this helps!
Here's one approach:
[UPDATE]
If I understand your expanded requirement correctly, you're looking for a generic way to transform dataframes by keys with an arbitrary
agg
function, like:I'd say that going down this path would result in very limited applicable
agg
functions. While the above works forfirst
, you would have to implement differently for, say,collect_list/collect_set
, etc. One can certainly hand-roll all the various types ofagg
functions, but it would likely result in unwarranted code maintenance hassle.