How to implement Generic Kafka Streams Deserialize

2019-07-14 14:28发布

问题:

I like Kafka, but hate having to write lots of serializers/deserializers, so I tried to create a GenericDeserializer<T> that could deserialize a generic type T.

Here's my attempt:

class GenericDeserializer< T > implements Deserializer< T > {
    static final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }
    @Override
    public T deserialize( String topic, byte[] data) {
            T result = null;
            try {
                    result = ( T )( objectMapper.readValue( data, T.class ) );
            }
            catch ( Exception e ) {
                    e.printStackTrace();
            }
            return result;
    }
    @Override
    public void close() {
    }
}

However, the (Eclipse) Java compiler complains about line

result = ( T )( objectMapper.readValue( data, T.class ) );

with the message Illegal class literal for the type parameter T.

Questions:

  1. Can you please explain the meaning of the message?
  2. Is there any way to work around this to get the intended effect?

回答1:

In java, you cannot instantiate a generic type, even reflectively, meaning objectMapper.readValue() could not to anything with T.class. So you would need to know what class to create in a given situation. The logical way to do that is have some mapping of topic -> type, that your deserializer can access. An example of this would be the SpecificAvroSerde that uses the confluent schema registry (an external process) to identity which type to deserialize into. You could also build this mapping into your code, but depending on your use case, that wouldn't be particularly robust.

https://github.com/confluentinc/schema-registry/blob/master/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro/SpecificAvroSerde.java

The meat of SpecificAvroSerde is a little deeper - here's a chunk that is doing the work of asking the schema registry what type it should decode into: https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L109-L139

Of course, this code is all clouded by the complexities of Avro. I'd write some example code about how to do this in-memory with JSON if I had time.



回答2:

you can achieve generic deserialization by using TypeReference from package com.fasterxml.jackson.core.type

public class KafkaGenericDeserializer<T> implements Deserializer<T> {

    private final ObjectMapper mapper;
    private final TypeReference<T> typeReference;

    public KafkaGenericDeserializer(ObjectMapper mapper, TypeReference<T> typeReference) {
        this.mapper = mapper;
        this.typeReference = typeReference;
    }

    @Override
    public T deserialize(final String topic, final byte[] data) {
        if (data == null) {
            return null;
        }

        try {
            return mapper.readValue(data, typeReference);
        } catch (final IOException ex) {
            throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) + "] from topic [" + topic + "]", ex);
        }
    }

    @Override
    public void close() {}

    @Override
    public void configure(final Map<String, ?> settings, final boolean isKey) {}
}

using such generic deserializer, you can create Serge:

public static <T> Serde<T> createSerdeWithGenericDeserializer(TypeReference<T> typeReference) {
    KafkaGenericDeserializer<T> kafkaGenericDeserializer = new KafkaGenericDeserializer<>(objectMapper, typeReference);
    return Serdes.serdeFrom(new JsonSerializer<>(), kafkaGenericDeserializer);
}

here JsonSerializer is from spring-kafka dependency, or implement your own serialization.

after that you can use serde during Kafka stream creation:

TypeReference<YourGenericClass<SpecificClass>> typeReference = new TypeReference<YourGenericClass<SpecificClass>>() {};
Serde<YourGenericClass<SpecificClass>> itemSerde = createSerdeWithGenericDeserializer(typeReference);
Consumed<String, YourGenericClass<SpecificClass>> consumed = Consumed.with(Serdes.String(), itemSerde);
streamsBuilder.stream(topicName, consumed);