My KafkaProducer
is able to use KafkaAvroSerializer
to serialize objects to my topic. However, KafkaConsumer.poll()
returns deserialized GenericRecord
instead of my serialized class.
MyKafkaProducer
KafkaProducer<CharSequence, MyBean> producer;
try (InputStream props = Resources.getResource("producer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
properties.put("schema.registry.url", "http://localhost:8081");
MyBean bean = new MyBean();
producer = new KafkaProducer<>(properties);
producer.send(new ProducerRecord<>(topic, bean.getId(), bean));
My KafkaConsumer
try (InputStream props = Resources.getResource("consumer.props").openStream()) {
properties.load(props);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
properties.put("schema.registry.url", "http://localhost:8081");
consumer = new KafkaConsumer<>(properties);
}
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<CharSequence, MyBean> records = consumer.poll(100);
if (records.isEmpty()) {
continue;
}
for (ConsumerRecord<CharSequence, MyBean> record : records) {
MyBean bean = record.value(); // <-------- This is throwing a cast Exception because it cannot cast GenericRecord to MyBean
System.out.println("consumer received: " + bean);
}
}
MyBean bean = record.value();
That line throws a cast Exception because it cannot cast GenericRecord to MyBean.
I'm using kafka-client-0.9.0.1
, kafka-avro-serializer-3.0.0
.
KafkaAvroDeserializer supports SpecificData
It's not enabled by default. To enable it:
KafkaAvroDeserializer does not support ReflectData
Confluent's
KafkaAvroDeserializer
does not know how to deserialize using Avro ReflectData. I had to extend it to support Avro ReflectData:Define a custom deserializer class which deserializes to
MyBean
:Configure
KafkaConsumer
to use the custom deserializer class:To add to Chin Huang's answer, for minimal code and better performance, you should probably implement it this way :