My KafkaProducer
is able to use KafkaAvroSerializer
to serialize objects to my topic. However, KafkaConsumer.poll()
returns deserialized GenericRecord
instead of my serialized class.
MyKafkaProducer
KafkaProducer<CharSequence, MyBean> producer;
try (InputStream props = Resources.getResource("producer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
properties.put("schema.registry.url", "http://localhost:8081");
MyBean bean = new MyBean();
producer = new KafkaProducer<>(properties);
producer.send(new ProducerRecord<>(topic, bean.getId(), bean));
My KafkaConsumer
try (InputStream props = Resources.getResource("consumer.props").openStream()) {
properties.load(props);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
properties.put("schema.registry.url", "http://localhost:8081");
consumer = new KafkaConsumer<>(properties);
}
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<CharSequence, MyBean> records = consumer.poll(100);
if (records.isEmpty()) {
continue;
}
for (ConsumerRecord<CharSequence, MyBean> record : records) {
MyBean bean = record.value(); // <-------- This is throwing a cast Exception because it cannot cast GenericRecord to MyBean
System.out.println("consumer received: " + bean);
}
}
MyBean bean = record.value();
That line throws a cast Exception because it cannot cast GenericRecord to MyBean.
I'm using kafka-client-0.9.0.1
, kafka-avro-serializer-3.0.0
.
KafkaAvroDeserializer supports SpecificData
It's not enabled by default. To enable it:
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
KafkaAvroDeserializer does not support ReflectData
Confluent's KafkaAvroDeserializer
does not know how to deserialize using Avro ReflectData. I had to extend it to support Avro ReflectData:
/**
* Extends deserializer to support ReflectData.
*
* @param <V>
* value type
*/
public abstract class ReflectKafkaAvroDeserializer<V> extends KafkaAvroDeserializer {
private Schema readerSchema;
private DecoderFactory decoderFactory = DecoderFactory.get();
protected ReflectKafkaAvroDeserializer(Class<V> type) {
readerSchema = ReflectData.get().getSchema(type);
}
@Override
protected Object deserialize(
boolean includeSchemaAndVersion,
String topic,
Boolean isKey,
byte[] payload,
Schema readerSchemaIgnored) throws SerializationException {
if (payload == null) {
return null;
}
int schemaId = -1;
try {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != MAGIC_BYTE) {
throw new SerializationException("Unknown magic byte!");
}
schemaId = buffer.getInt();
Schema writerSchema = schemaRegistry.getByID(schemaId);
int start = buffer.position() + buffer.arrayOffset();
int length = buffer.limit() - 1 - idSize;
DatumReader<Object> reader = new ReflectDatumReader(writerSchema, readerSchema);
BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
return reader.read(null, decoder);
} catch (IOException e) {
throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
} catch (RestClientException e) {
throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
}
}
}
Define a custom deserializer class which deserializes to MyBean
:
public class MyBeanDeserializer extends ReflectKafkaAvroDeserializer<MyBean> {
public MyBeanDeserializer() {
super(MyBean.class);
}
}
Configure KafkaConsumer
to use the custom deserializer class:
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyBeanDeserializer.class);
To add to Chin Huang's answer, for minimal code and better performance, you should probably implement it this way :
/**
* Extends deserializer to support ReflectData.
*
* @param <V>
* value type
*/
public abstract class SpecificKafkaAvroDeserializer<V extends SpecificRecordBase> extends AbstractKafkaAvroDeserializer implements Deserializer<V> {
private final Schema schema;
private Class<T> type;
private DecoderFactory decoderFactory = DecoderFactory.get();
protected SpecificKafkaAvroDeserializer(Class<T> type, Map<String, ?> props) {
this.type = type;
this.schema = ReflectData.get().getSchema(type);
this.configure(this.deserializerConfig(props));
}
public void configure(Map<String, ?> configs) {
this.configure(new KafkaAvroDeserializerConfig(configs));
}
@Override
protected T deserialize(
boolean includeSchemaAndVersion,
String topic,
Boolean isKey,
byte[] payload,
Schema readerSchemaIgnore) throws SerializationException {
if (payload == null) {
return null;
}
int schemaId = -1;
try {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != MAGIC_BYTE) {
throw new SerializationException("Unknown magic byte!");
}
schemaId = buffer.getInt();
Schema schema = schemaRegistry.getByID(schemaId);
Schema readerSchema = ReflectData.get().getSchema(type);
int start = buffer.position() + buffer.arrayOffset();
int length = buffer.limit() - 1 - idSize;
SpecificDatumReader<T> reader = new SpecificDatumReader(schema, readerSchema);
BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
return reader.read(null, decoder);
} catch (IOException e) {
throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
} catch (RestClientException e) {
throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
}
}
}