Spark 2.0 DataSets groupByKey and divide operation

2019-06-21 19:07发布

I am very much pleased with Spark 2.0 DataSets because of it's compile time type safety. But here is couple of problem that I am not able to work out, I also didn't find good documentation for this.

Problem #1 - divide operation on aggregated column- Consider below code - I have a DataSet[MyCaseClass] and I wanted to groupByKey on c1,c2,c3 and sum(c4) / 8. The below code works well if I just calculate the sum but it gives compile time error for divide(8). I wonder how I can achieve following.

final case class MyClass (c1: String,
                          c2: String,
                          c3: String,
                          c4: Double)

    val myCaseClass: DataSet[MyCaseClass] = ??? // assume it's being loaded

    import sparkSession.implicits._
    import org.apache.spark.sql.expressions.scalalang.typed.{sum => typedSum}

       groupByKey(myCaseClass =>
          (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
          divide(8)). //this is breaking with exception

If I remove .divide(8) operation and run above command it gives me below output.

|        key|sum(c4)      |
| [A1,F2,S1]|         80.0|
| [A1,F1,S1]|         40.0|  

Problem #2 - converting groupedByKey result to another Typed DataFrame - Now second part of my problem is I want output again a typed DataSet. For that I have another case class (not sure if it is needed) but I am not sure how to map with grouped result -

final case class AnotherClass(c1: String,
                          c2: String,
                          c3: String,
                          average: Double) 

           groupByKey(myCaseClass =>
              (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
as[AnotherClass] //this is breaking with exception

but this again fails with an exception as grouped by key result is not directly mapped with AnotherClass.

PS : any other solution to achieve above is more than welcome.

2楼-- · 2019-06-21 19:52

The first problem can be resolved by using typed columns all the way down (KeyValueGroupedDataset.agg expects TypedColumn(-s)) You can defined aggregation result as:

val eight = lit(8.0)
  .as[Double]  // Not necessary

val sumByEight = typedSum[MyClass](_.c4)
  .as[Double]  // Required
  .name("div(sum(c4), 8)")

and plug it into following code:

val myCaseClass = Seq(
  MyClass("a", "b", "c", 2.0),
  MyClass("a", "b", "c", 3.0)

  .groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))

to get

|    key|div(sum(c4), 8)|
|[a,b,c]|          0.625|

The second problem is a result of using a class which doesn't conform to a data shape. A correct representation could be:

case class AnotherClass(key: (String, String, String), sum: Double)

which used with data defined above:

   .groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))

would give:

|    key|sum|

but .as[AnotherClass] is not necessary here if Dataset[((String, String, String), Double)] is acceptable.

You can of course skip all of that and just mapGroups (although not without performance penalty):

import shapeless.syntax.std.tuple._   // A little bit of shapeless

val tuples = myCaseClass
 .groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
 .mapGroups((group, iter) => group :+

with result

| _1| _2| _3| _4|
|  a|  b|  c|5.0|

reduceGroups could be a better option:

  .groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
  .reduceGroups((x, y) => x.copy(c4=x.c4 + y.c4))

with resulting Dataset:

|     _1|         _2|
登录 后发表回答