Spark: Create new accumulator type won't work

2019-08-24 02:44发布

问题:

I want to create an accumulator for lists of type List[(String, String)]. I first created the following object:

object ListAccumulator extends AccumulatorParam[List[(String, String)]] {
  def zero(initialValue: List[(String, String)]): List[(String, String)] = {
    Nil
  }

  def addInPlace(list1: List[(String, String)], list2: List[(String, String)]): List[(String, String)] = {
    list1 ::: list2
  }
}

In the same file (SparkQueries.scala) I tried to use it within a function in my class:

val resultList = sc.accumulator(Nil)(ListAccumulator)

However, here my compiler complains at (ListAccumulator). The following error occurs:

type mismatch; found : sparkMain.ListAccumulator.type required: org.apache.spark.AccumulatorParam[scala.collection.immutable.Nil.type] Note: List[(String, String)] >: scala.collection.immutable.Nil.type (and sparkMain.ListAccumulator.type <: org.apache.spark.AccumulatorParam[List[(String, String)]]), but trait AccumulatorParam is invariant in type T. You may wish to define T as -T instead.

sparkMain is the package the .scala file is in. What am I doing wrong? Is it possible the compiler doesn't know of the existence of the ListAccumulator object?

Thanks in advance!

回答1:

You can fix your type error like this:

val resultList = sc.accumulator(ListAccumulator.zero(Nil))(ListAccumulator)

The type inferencer in Scala is at fault where, assuming that the most specific type (Nil, the type of empty lists) is the type that you want for your accumulator. By using zero, with an explicit return type of List[(String, String)], you help it enough to know what you mean.

A side remark: you are using list concatenation for addInPlace, which is linear in the size of the list. If your lists can get large, your addition will be slow. If you need efficient appends use a ListBuffer, ArrayBuffer, or a Vector if you want an immutable sequence.