In code below I'm attempting to combine values:
val rdd: org.apache.spark.rdd.RDD[((String), Double)] =
sc.parallelize(List(
(("a"), 1.0),
(("a"), 3.0),
(("a"), 2.0)
))
val reduceByKey = rdd.reduceByKey((a , b) => String.valueOf(a) + String.valueOf(b))
reduceByValue
should contain (a , 1,3,2) but receive compile time error :
Multiple markers at this line - type mismatch; found : String required: Double - type mismatch; found : String
required: Double
What determines the type of the reduce function? Can the type not be converted?
I could use groupByKey
to achieve same result but just want to understand reduceByKey
.
No, given an rdd of type RDD[(K,V)]
, reduceByKey
will take an associative function of type (V,V) => V
.
If we want to apply a reduction that changes the type of the values to another arbitrary type, then we can use aggregateByKey
:
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)
Using the zeroValue
and the seqOp
function, it provides a fold-like operation at the map side while the associate function combOp
combines the results of the seqOp
to the final result, much like reduceByKey would do.
As we can appreciate from the signature, while the collection values are of type V
the result of aggregateByKey
will be of an arbitrary type U
Applied to the example above, aggregateByKey
would look like this:
rdd.aggregateByKey("")({case (aggr , value) => aggr + String.valueOf(value)}, (aggr1, aggr2) => aggr1 + aggr2)
The problem with your code is that your Value type mismatch. You can achieve the same output with reduceByKey, provided you changed the type of value in your RDD.
val rdd: org.apache.spark.rdd.RDD[((String), String)] =
sc.parallelize(List(
("a", "1.0"),
("a", "3.0"),
("a", "2.0")
))
val reduceByKey = rdd.reduceByKey((a , b) => a.concat(b))
Here is the same example. As long as the function you pass to reduceByKey takes two parameter of the type Value( Double in your case ) and returns a single parameter of the same type, your reduceByKey will work.