I've encountered several serializable exceptions, and I did some searching on Flink's internet and doc; there are some famous solutions like transient, extends Serializable etc. Each time the origin of exception is very clear, but in my case, i am unable to find where exactly it is not serialized.
Q: How should i debug this kind of Exception?
A.scala:
class executor ( val sink: SinkFunction[List[String]] {
def exe(): Unit = {
xxx.....addSink(sinks)
}
}
B.scala:
class Main extends App {
def createSink: SinkFunction[List[String]] = new StringSink()
object StringSink {
// static
val stringList: List[String] = List()
}
// create a testing sink
class StringSink extends SinkFunction[List[String]] {
override def invoke(strs: List[String]): Unit = {
// add strs into the variable "stringList" of the compagin object StringSink
}
}
new executor(createSink()).exe()
// then do somethings with the strings
}
The exception is:
The implementation of the SinkFunction is not serializable. The object probably contains or references non serializable fields.
Two suspicious points that I found:
- The instance of
StringSink
is passed into another file. - In the class of
StringSink
, it uses a static variablestringList
of its compagin object.
I faced similar problems. It used to take longtime to find out what member/object is not serializable. The exception logs are not really helpful.
What helped me is the following JVM option, which enables more details in exception trace.
Enable this option...
-Dsun.io.serialization.extendedDebugInfo=true
My first guess would be the you don't have a no argument constructor in StringSink
Rules for POJO types Clipped from here
Flink recognizes a data type as a POJO type (and allows “by-name” field referencing) if the following conditions are fulfilled:
Just add a no argument constructor and try again