I like Kafka, but hate having to write lots of serializers/deserializers, so I tried to create a GenericDeserializer<T>
that could deserialize a generic type T.
Here's my attempt:
class GenericDeserializer< T > implements Deserializer< T > {
static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public T deserialize( String topic, byte[] data) {
T result = null;
try {
result = ( T )( objectMapper.readValue( data, T.class ) );
}
catch ( Exception e ) {
e.printStackTrace();
}
return result;
}
@Override
public void close() {
}
}
However, the (Eclipse) Java compiler complains about line
result = ( T )( objectMapper.readValue( data, T.class ) );
with the message Illegal class literal for the type parameter T
.
Questions:
- Can you please explain the meaning of the message?
- Is there any way to work around this to get the intended effect?
In java, you cannot instantiate a generic type, even reflectively, meaning objectMapper.readValue()
could not to anything with T.class
. So you would need to know what class to create in a given situation. The logical way to do that is have some mapping of topic -> type, that your deserializer can access. An example of this would be the SpecificAvroSerde
that uses the confluent schema registry (an external process) to identity which type to deserialize into. You could also build this mapping into your code, but depending on your use case, that wouldn't be particularly robust.
https://github.com/confluentinc/schema-registry/blob/master/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro/SpecificAvroSerde.java
The meat of SpecificAvroSerde
is a little deeper - here's a chunk that is doing the work of asking the schema registry what type it should decode into:
https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L109-L139
Of course, this code is all clouded by the complexities of Avro. I'd write some example code about how to do this in-memory with JSON if I had time.
you can achieve generic deserialization by using TypeReference from package com.fasterxml.jackson.core.type
public class KafkaGenericDeserializer<T> implements Deserializer<T> {
private final ObjectMapper mapper;
private final TypeReference<T> typeReference;
public KafkaGenericDeserializer(ObjectMapper mapper, TypeReference<T> typeReference) {
this.mapper = mapper;
this.typeReference = typeReference;
}
@Override
public T deserialize(final String topic, final byte[] data) {
if (data == null) {
return null;
}
try {
return mapper.readValue(data, typeReference);
} catch (final IOException ex) {
throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) + "] from topic [" + topic + "]", ex);
}
}
@Override
public void close() {}
@Override
public void configure(final Map<String, ?> settings, final boolean isKey) {}
}
using such generic deserializer, you can create Serge
:
public static <T> Serde<T> createSerdeWithGenericDeserializer(TypeReference<T> typeReference) {
KafkaGenericDeserializer<T> kafkaGenericDeserializer = new KafkaGenericDeserializer<>(objectMapper, typeReference);
return Serdes.serdeFrom(new JsonSerializer<>(), kafkaGenericDeserializer);
}
here JsonSerializer
is from spring-kafka
dependency, or implement your own serialization.
after that you can use serde during Kafka stream creation:
TypeReference<YourGenericClass<SpecificClass>> typeReference = new TypeReference<YourGenericClass<SpecificClass>>() {};
Serde<YourGenericClass<SpecificClass>> itemSerde = createSerdeWithGenericDeserializer(typeReference);
Consumed<String, YourGenericClass<SpecificClass>> consumed = Consumed.with(Serdes.String(), itemSerde);
streamsBuilder.stream(topicName, consumed);