如何实现在Spark UDAF一个fastutils地图?(How do I implement a

2019-10-31 07:54发布

我建立一个火花UDAF其中我存储在一个fastutils地图的中间数据。 模式是这样的:

def bufferSchema = new StructType().add("my_map_col", MapType(StringType, IntegerType))

我初始化没有问题:

def initialize(buffer: MutableAggregationBuffer) = {
   buffer(0) = new Object2IntOpenHashMap[String]()
}

问题是当我尝试更新:

def update(buffer: MutableAggregationBuffer, input: Row) = { 
  val myMap = buffer.getAs[Object2IntOpenHashMap[String]](0)
  myMap.put(input.getAs[String](0), 1)
  buffer(0) = myMap
}

收到以下错误:

Caused by: java.lang.ClassCastException: scala.collection.immutable.Map$EmptyMap$ cannot be cast to it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap

什么办法可以使这项工作?

Answer 1:

什么办法可以使这项工作?

并不是的。 这个

buffer.getAs[Object2IntOpenHashMap[String]](0)

相当于

buffer.get(0).asInstanceOf[Object2IntOpenHashMap[String]]]

和外部类型MapTypescala.collection.Map

实际上它是一个死胡同反正- UserDefinedAggregate功能使数据的完整副本在每次调用 。 你可能有一个更好的运气Aggregator (如链接的问题)。



文章来源: How do I implement a fastutils map in a Spark UDAF?