Can reduceBykey be used to change type and combine

2019-05-23 03:27发布

In code below I'm attempting to combine values:

val rdd: org.apache.spark.rdd.RDD[((String), Double)] =
    sc.parallelize(List(
      (("a"), 1.0),
      (("a"), 3.0),
      (("a"), 2.0)
      ))

val reduceByKey = rdd.reduceByKey((a , b) => String.valueOf(a) + String.valueOf(b))

reduceByValue should contain (a , 1,3,2) but receive compile time error :

Multiple markers at this line - type mismatch; found : String required: Double - type mismatch; found : String 
 required: Double

What determines the type of the reduce function? Can the type not be converted?

I could use groupByKey to achieve same result but just want to understand reduceByKey.

2条回答
Explosion°爆炸
2楼-- · 2019-05-23 04:08

No, given an rdd of type RDD[(K,V)], reduceByKey will take an associative function of type (V,V) => V.

If we want to apply a reduction that changes the type of the values to another arbitrary type, then we can use aggregateByKey:

def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)

Using the zeroValue and the seqOp function, it provides a fold-like operation at the map side while the associate function combOp combines the results of the seqOp to the final result, much like reduceByKey would do. As we can appreciate from the signature, while the collection values are of type V the result of aggregateByKey will be of an arbitrary type U

Applied to the example above, aggregateByKey would look like this:

rdd.aggregateByKey("")({case (aggr , value) => aggr + String.valueOf(value)}, (aggr1, aggr2) => aggr1 + aggr2)
查看更多
等我变得足够好
3楼-- · 2019-05-23 04:22

The problem with your code is that your Value type mismatch. You can achieve the same output with reduceByKey, provided you changed the type of value in your RDD.

val rdd: org.apache.spark.rdd.RDD[((String), String)] =
    sc.parallelize(List(
      ("a", "1.0"),
      ("a", "3.0"),
      ("a", "2.0")
      ))

    val reduceByKey = rdd.reduceByKey((a , b) => a.concat(b))

Here is the same example. As long as the function you pass to reduceByKey takes two parameter of the type Value( Double in your case ) and returns a single parameter of the same type, your reduceByKey will work.

查看更多
登录 后发表回答