Mapping values returns nothing in scala Flink

2019-07-26 02:02发布

问题:

I am developing a discretization algorithm in flink, but I am having problems applying a map function.

The discretization is stored in V which is a

private[this] val V = Vector.tabulate(nAttrs)(i => IntervalHeap(nBins, i, s))

This Vector is updated in the following method:

private[this] def updateSamples(v: LabeledVector): Vector[IntervalHeap] = {
    val attrs = v.vector.map(_._2)
    // TODO: Check for missing values
    attrs
      .zipWithIndex
      .foreach {
        case (attr, i) =>
          if (V(i).nInstances < s) {
            V(i) insert (attr)
          } else {
            if (randomReservoir(0) <= s / (i + 1)) {
              val randVal = Random nextInt (s)
              V(i) replace (randVal, attr)
            }
          }
      }
    V
  }

And the map function apply to each instance of a dataset the function `updateSamples):

def discretize(data: DataSet[LabeledVector]) /*: DataSet[IntervalHeap]*/ = {
    val d = data map (x => updateSamples(x))
    log.debug(s"$V")
    d print
  }

However, when I print d, I get V empty:

Vector(Attr [0]         [;][;][;][;][;])
Vector(Attr [0]         [;][;][;][;][;])
Vector(Attr [0]         [;][;][;][;][;])
Vector(Attr [0]         [;][;][;][;][;])
Vector(Attr [0]         [;][;][;][;][;])
Vector(Attr [0]         [;][;][;][;][;])
Vector(Attr [0]         [;][;][;][;][;])
Vector(Attr [0]         [;][;][;][;][;])
Vector(Attr [0]         [;][;][;][;][;])
Vector(Attr [0]         [;][;][;][;][;])

I've tried to not return V in updateSamples and just access it from driscretize once the map is applied, but the same occurs. If inside updateSamples I print the value of V I can see that its being updated.

Update

If I don't use the container DataSet[T] and instead use a Seq like this:

def discretize(data: Seq[LabeledVector]) /*: DataSet[IntervalHeap]*/ = {
    data map (x => updateSamples(x))
  }

The discretization works fine.

What may be happening?

Update 2

After a few days searching, It seems the problem is guava's collection MinMaxPriorityQueue. However, I can not found anything that helps me to Serialize this collection, here is what I've found so far:

  • How do you serialize a guava collection?
  • magro/kryo-serializers This is a collection of serializers for guava's collections, but MinMaxPriorityQeue is not there
  • I've tried this, which consists in registering a kryo Serializer env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[JavaSerializer]) but does not work.
  • I've also tried to add a default Kryo serializer with no luck env.getConfig.addDefaultKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[JavaSerializer])