How to debug serializable exception in Flink?

2019-07-16 04:49发布

问题:

I've encountered several serializable exceptions, and I did some searching on Flink's internet and doc; there are some famous solutions like transient, extends Serializable etc. Each time the origin of exception is very clear, but in my case, i am unable to find where exactly it is not serialized.

Q: How should i debug this kind of Exception?

A.scala:

class executor ( val sink: SinkFunction[List[String]] {
    def exe(): Unit = {
        xxx.....addSink(sinks)
    }
}

B.scala:

class Main extends App {
  def createSink: SinkFunction[List[String]] = new StringSink()

  object StringSink {
    // static
    val stringList: List[String] = List()
  }

  // create a testing sink
  class StringSink extends SinkFunction[List[String]] {
    override def invoke(strs: List[String]): Unit = {
        // add strs into the variable "stringList" of the compagin object StringSink
    }
  }

  new executor(createSink()).exe()

  // then do somethings with the strings
}

The exception is:

The implementation of the SinkFunction is not serializable. The object probably contains or references non serializable fields.

Two suspicious points that I found:

  1. The instance of StringSink is passed into another file.
  2. In the class of StringSink, it uses a static variable stringList of its compagin object.

回答1:

My first guess would be the you don't have a no argument constructor in StringSink

Rules for POJO types Clipped from here

Flink recognizes a data type as a POJO type (and allows “by-name” field referencing) if the following conditions are fulfilled:

  1. The class is public and standalone (no non-static inner class)
  2. The class has a public no-argument constructor
  3. All non-static, non-transient fields in the class (and all superclasses) are either public (and non-final) or have a public getter- and a setter- method that follows the Java beans naming conventions for getters and setters.

Just add a no argument constructor and try again

    class StringSink extends SinkFunction[List[String]] {
        public StringSink() {
        }

        @override def invoke(strs: List[String]): Unit = {
            // add strs into the variable "stringList" of the compagin object StringSink
        }
}


回答2:

I faced similar problems. It used to take longtime to find out what member/object is not serializable. The exception logs are not really helpful.

What helped me is the following JVM option, which enables more details in exception trace.

Enable this option...

-Dsun.io.serialization.extendedDebugInfo=true