Spark: Create new accumulator type won't work

2019-08-24 02:57发布

I want to create an accumulator for lists of type List[(String, String)]. I first created the following object:

object ListAccumulator extends AccumulatorParam[List[(String, String)]] {
  def zero(initialValue: List[(String, String)]): List[(String, String)] = {
    Nil
  }

  def addInPlace(list1: List[(String, String)], list2: List[(String, String)]): List[(String, String)] = {
    list1 ::: list2
  }
}

In the same file (SparkQueries.scala) I tried to use it within a function in my class:

val resultList = sc.accumulator(Nil)(ListAccumulator)

However, here my compiler complains at (ListAccumulator). The following error occurs:

type mismatch; found : sparkMain.ListAccumulator.type required: org.apache.spark.AccumulatorParam[scala.collection.immutable.Nil.type] Note: List[(String, String)] >: scala.collection.immutable.Nil.type (and sparkMain.ListAccumulator.type <: org.apache.spark.AccumulatorParam[List[(String, String)]]), but trait AccumulatorParam is invariant in type T. You may wish to define T as -T instead.

sparkMain is the package the .scala file is in. What am I doing wrong? Is it possible the compiler doesn't know of the existence of the ListAccumulator object?

Thanks in advance!

1条回答
霸刀☆藐视天下
2楼-- · 2019-08-24 03:00

You can fix your type error like this:

val resultList = sc.accumulator(ListAccumulator.zero(Nil))(ListAccumulator)

The type inferencer in Scala is at fault where, assuming that the most specific type (Nil, the type of empty lists) is the type that you want for your accumulator. By using zero, with an explicit return type of List[(String, String)], you help it enough to know what you mean.

A side remark: you are using list concatenation for addInPlace, which is linear in the size of the list. If your lists can get large, your addition will be slow. If you need efficient appends use a ListBuffer, ArrayBuffer, or a Vector if you want an immutable sequence.

查看更多
登录 后发表回答