I am trying to use custom accumulator in Apache Spark to accumulate pairs in a list.
The result should have List[(Int, Int)]
type. For this I creat custom accumulator:
import org.apache.spark.AccumulatorParam
class AccumPairs extends AccumulatorParam[List[(Int,Int)]] {
def zero(initialValue: List[(Int,Int)]): List[(Int,Int)] = {
List()
}
def addInPlace(l1: List[(Int,Int)], l2: List[(Int,Int)]): List[(Int,Int)] = {
l1 ++ l2
}
}
Yet I can not instantiate variable of this type.
val pairAccum = sc.accumulator(new List():List[(Int,Int)])(AccumPairs)
results in error. Please help.