java.lang.Instantiation Exception while deserializ

2019-08-09 08:23发布

问题:

I am trying to deserialize an avro byte stream into a scala case class object. Basically, i had a kafka stream with avro encoded data flowing and now there is an addition to the schema and i am trying to update the scala case class to include the new field. The case class looks like this

/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
                sw_version: String,
                timestamp: String,
                reading: Double,
                new_field: Option[String] = None
               )  {

this() = this("na", "na", "na", 0, None) }

The avro schema is as follows:

{
  "type": "record",
  "name": "some_name",
  "namespace": "some_namespace",
  "fields": [
    {
      "name": "deviceId",
      "type": "string"
    },
    {
      "name": "sw_version",
      "type": "string"
    }, 
    {
      "name": "timestamp",
      "type": "string"
    },
    {
      "name": "reading",
      "type": "double"
    },
    {
      "name": "new_field",
     "type": ["null", "string"],
      "default": null
    }]}

When the data is received i get the following exception:

java.lang.RuntimeException: java.lang.InstantiationException

I can receive the data just fine a consumer written in python so i know that the data is being streamed correctly in the correct format. I am suspecting the problem is with the creation of the case class constructor, i have tried doing this:

/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
                sw_version: String,
                timestamp: String,
                reading: Double,
                new_field: Option[String]
               )  {
this() = this("na", "na", "na", 0, some("na"))
}

but no luck.

The deserializer code is (excerpts):

// reader and decoder for reading avro records
private var reader: DatumReader[T] = null
private var decoder : BinaryDecoder = null
decoder = DecoderFactory.get.binaryDecoder(message, decoder)
reader.read(null.asInstanceOf[T], decoder)

I could not find any other examples of having constructors for case classes which are used for deserializing avro, i had posted a related question last year java.lang.NoSuchMethodException for init method in Scala case class and based on the response i was able to implement my current code which has been working fine ever since.

回答1:

I resolved this problem by following a totally different approach. I used the Confluent Kafka client as provided in this example https://github.com/jfrazee/schema-registry-examples/tree/master/src/main/scala/io/atomicfinch/examples/flink. I also have a Confluent schema registry which is really easy to setup using the containerized all in one solution that comes with kafka and a schema registry https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html.

I had to add confluent dependencies and repo in my pom.xml file. This goes in the repository section.

<repository>
    <id>confluent</id>
    <url>http://packages.confluent.io/maven/</url>
</repository>

This goes in the dependency section:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro-confluent-registry</artifactId>
    <version>1.8.0</version>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <!-- For Confluent Platform 5.2.1 -->
    <version>5.2.1</version>
</dependency>

With the code provided in https://github.com/jfrazee/schema-registry-examples/blob/master/src/main/scala/io/atomicfinch/examples/flink/ConfluentRegistryDeserializationSchema.scala i was able to talk to Confluent schema registry and then based on the schema id in the avro message header this downloads the schema from the schema reg and gives me back a GenericRecord object from which i can easily any and all fields of interest and create a new DataStream of the DeviceData object.

val kafka_consumer = new FlinkKafkaConsumer010("prod.perfwarden.minute",
  new ConfluentRegistryDeserializationSchema[GenericRecord](classOf[GenericRecord], "http://localhost:8081"),
  properties)
val device_data_stream = env
  .addSource(kafka_consumer)
  .map({x => new DeviceData(x.get("deviceId").toString,
    x.get("sw_version").toString,
    x.get("timestamp").toString,
    x.get("reading").toString.toDouble,
    x.get("new_field").toString)})

The confluent kafka client takes care of deserializing the avro bytes stream as per the schema, including the default values. Setting up the schema registry and using the confluent kafka client may take just a little bit of time to get used to but is probably the better long term solution, just my 2 cents.