I am developing a discretization algorithm in flink, but I am having problems applying a map
function.
The discretization is stored in V
which is a
private[this] val V = Vector.tabulate(nAttrs)(i => IntervalHeap(nBins, i, s))
This Vector is updated in the following method:
private[this] def updateSamples(v: LabeledVector): Vector[IntervalHeap] = {
val attrs = v.vector.map(_._2)
// TODO: Check for missing values
attrs
.zipWithIndex
.foreach {
case (attr, i) =>
if (V(i).nInstances < s) {
V(i) insert (attr)
} else {
if (randomReservoir(0) <= s / (i + 1)) {
val randVal = Random nextInt (s)
V(i) replace (randVal, attr)
}
}
}
V
}
And the map function apply to each instance of a dataset the function `updateSamples):
def discretize(data: DataSet[LabeledVector]) /*: DataSet[IntervalHeap]*/ = {
val d = data map (x => updateSamples(x))
log.debug(s"$V")
d print
}
However, when I print d
, I get V empty:
Vector(Attr [0] [;][;][;][;][;])
Vector(Attr [0] [;][;][;][;][;])
Vector(Attr [0] [;][;][;][;][;])
Vector(Attr [0] [;][;][;][;][;])
Vector(Attr [0] [;][;][;][;][;])
Vector(Attr [0] [;][;][;][;][;])
Vector(Attr [0] [;][;][;][;][;])
Vector(Attr [0] [;][;][;][;][;])
Vector(Attr [0] [;][;][;][;][;])
Vector(Attr [0] [;][;][;][;][;])
I've tried to not return V
in updateSamples
and just access it from driscretize
once the map is applied, but the same occurs. If inside updateSamples
I print the value of V
I can see that its being updated.
Update
If I don't use the container DataSet[T]
and instead use a Seq
like this:
def discretize(data: Seq[LabeledVector]) /*: DataSet[IntervalHeap]*/ = {
data map (x => updateSamples(x))
}
The discretization works fine.
What may be happening?
Update 2
After a few days searching, It seems the problem is guava's collection MinMaxPriorityQueue
. However, I can not found anything that helps me to Serialize this collection, here is what I've found so far:
- How do you serialize a guava collection?
- magro/kryo-serializers This is a collection of serializers for guava's collections, but
MinMaxPriorityQeue
is not there - I've tried this, which consists in registering a
kryo
Serializerenv.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[JavaSerializer])
but does not work. - I've also tried to add a default
Kryo
serializer with no luckenv.getConfig.addDefaultKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[JavaSerializer])