I am trying to create camel route with kafka component trying to consume events with io.confluent.kafka.serializers.KafkaAvroDeserializer
and schemaRegistry url along with other component parameters. I am not sure if this is full supported by Camel-Kafka currently. Can someone please comment on this ?
from("kafka:{{kafka.notification.topic}}?brokers={{kafka.notification.brokers}}"
+ "&maxPollRecords={{kafka.notification.maxPollRecords}}"
+ "&seekTo={{kafka.notification.seekTo}}"
+ "&specificAvroReader=" + "true"
+ "&valueDeserializer=" + "io.confluent.kafka.serializers.KafkaAvroDeserializer"
+"&schemaRegistryURL=localhost:9021"
+ "&allowManualCommit={{kafka.notification.autocommit}})
specificAvroReader
& schemaRegistryURL
are the properties which seems to be not supported.
I believe the only way currently to have camel-kafka to work with Confluent Schema Registry is to write a custom AvroSerilizer/ AvroDeserializer (io.confluent.kafka.serializers.AbstractKafkaAvroSerializer/ io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer). E.g.:
BlablaDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object>
and
BlablaSerializer extends AbstractKafkaAvroSerializer implements Serializer<Object>
and then set them on the camel component. E.g. for the value it will be:
KafkaConfiguration kafkaConfiguration.setValueDeserializer(...)
Got this working after adding compile 'org.apache.camel:camel-kafka:3.0.0-M2' which can be found in this staging repository https://repository.apache.org/content/repositories/orgapachecamel-1124/org/apache/camel/
I think 3.0.0-M2 will be officially supported by Camel early next week.
Edit : 3.0.0-M2 available now https://repository.apache.org/content/repositories/releases/org/apache/camel/apache-camel/3.0.0-M2/
Has support for camel Kafka & Confluent Schema registry