Apache Camel Kafka support for Confluent schema Re

2019-07-22 15:14发布

问题:

I am trying to create camel route with kafka component trying to consume events with io.confluent.kafka.serializers.KafkaAvroDeserializer and schemaRegistry url along with other component parameters. I am not sure if this is full supported by Camel-Kafka currently. Can someone please comment on this ?

from("kafka:{{kafka.notification.topic}}?brokers={{kafka.notification.brokers}}"
                + "&maxPollRecords={{kafka.notification.maxPollRecords}}"
                + "&seekTo={{kafka.notification.seekTo}}"
                + "&specificAvroReader=" + "true"
                + "&valueDeserializer=" + "io.confluent.kafka.serializers.KafkaAvroDeserializer"
                +"&schemaRegistryURL=localhost:9021"
                + "&allowManualCommit={{kafka.notification.autocommit}})

specificAvroReader & schemaRegistryURL are the properties which seems to be not supported.

回答1:

I believe the only way currently to have camel-kafka to work with Confluent Schema Registry is to write a custom AvroSerilizer/ AvroDeserializer (io.confluent.kafka.serializers.AbstractKafkaAvroSerializer/ io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer). E.g.:

BlablaDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object>

and

BlablaSerializer extends AbstractKafkaAvroSerializer implements Serializer<Object>

and then set them on the camel component. E.g. for the value it will be:

 KafkaConfiguration kafkaConfiguration.setValueDeserializer(...)


回答2:

Got this working after adding compile 'org.apache.camel:camel-kafka:3.0.0-M2' which can be found in this staging repository https://repository.apache.org/content/repositories/orgapachecamel-1124/org/apache/camel/

I think 3.0.0-M2 will be officially supported by Camel early next week.

Edit : 3.0.0-M2 available now https://repository.apache.org/content/repositories/releases/org/apache/camel/apache-camel/3.0.0-M2/

Has support for camel Kafka & Confluent Schema registry