How to get minimum value for each distinct key usi

2020-01-21 16:43发布

问题:

I have a flat map that returns the Sequence Seq((20,6),(22,6),(23,6),(24,6),(20,1),(22,1)) now I need to use the reduceByKey() on the sequence that I got from the flat map to find the minimum value for each key.

I tried using .reduceByKey(a,min(b)) and .reduceByKey((a, b) => if (a._1 < b._1) a else b) but neither of them are working.

This is my code

for(i<- 1 to 5){

var graph=graph.flatMap{ in => in match{ case (x, y, zs) => (x, y) :: zs.map(z => (z, y))}
.reduceByKey((a, b) => if (a._1 < b._1) a else b)

}

For each distinct key the flatmap generates I need to get the minimum value for that key. Eg: the flatmap generates Seq((20,6),(22,6),(23,6),(24,6),(20,1),(22,1)) the resultByKey() should generate (20,1),(22,1),(23,6),(24,6)

回答1:

Here is the signature of reduceByKey:

def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

Basically, given a RDD of key/value pairs, you need to provide a function that reduces two values (and not the entire pair) into one. Therefore, you can use it as follows:

val rdd = sc.parallelize(Seq((20,6),(22,6),(23,6),(24,6),(20,1),(22,1)))
val result = rdd.reduceByKey((a, b) => if (a < b) a else b)
result.collect
// Array[(Int, Int)] = Array((24,6), (20,1), (22,1), (23,6))