i am playing with Kafka and streams technology; i have created a custom serializer and deserializer for the KStream that i will use to receive messages from a given topic.
Now, the problem is that i am creating a serde in this way:
JsonSerializer<EventMessage> serializer = new JsonSerializer<>();
JsonDeserializer<EventMessage> deserializer = new JsonDeserializer<>(EventMessage.class);
Serde<EventMessage> messageSerde = Serdes.serdeFrom(serializer, deserializer);
Serializer implementation:
public class JsonSerializer<T> implements Serializer<T> {
private Gson gson = new Gson();
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String topic, T data) {
return gson.toJson(data).getBytes(Charset.forName("UTF-8"));
}
@Override
public void close() {
}
}
Deserializer implementation:
public class JsonDeserializer<T> implements Deserializer<T> {
private Gson gson = new Gson();
private Class<T> deserializedClass;
public JsonDeserializer() {
}
public JsonDeserializer(Class<T> deserializedClass) {
this.deserializedClass = deserializedClass;
}
@Override
@SuppressWarnings("unchecked")
public void configure(Map<String, ?> map, boolean b) {
if(deserializedClass == null) {
deserializedClass = (Class<T>) map.get("serializedClass");
}
}
@Override
public T deserialize(String topic, byte[] data) {
System.out.print(data);
if(data == null){
return null;
}
return gson.fromJson(new String(data),deserializedClass);
}
@Override
public void close() {
}
}
When i try to execute the code, i receive the following error:
Caused by: org.apache.kafka.common.KafkaException: Could not instantiate class org.apache.kafka.common.serialization.Serdes$WrapperSerde Does it have a public no-argument constructor?
Full dump here: https://pastebin.com/WwpuXuxB
This is the way i am trying to use serde:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, EventMessage> eventsStream = builder.stream(stringSerde, messageSerde, topic);
KStream<String, EventMessage> outStream = eventsStream
.mapValues(value -> EventMessage.build(value.type, value.timestamp));
outStream.to("output");
Also, i am not totally sure i am setting up correctly the properties to setup up serializer and deserializer globally:
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, messageSerde.getClass());
To complete the Matthias answer I've just coded a simple example of how to create a custom Serde (Serializer / Deserializer) within a Kafka Stream App. It's is available to clone and try in: https://github.com/Davidcorral94/Kafka-Streams-Custom-Seder
First I create two classes, one for the Serializer and another for the Deserializer. In this case I use Gson library to perform the serialization/deserialization.
Serializer
Deserializer
Then, I wrap both of them into a Serde to be able to use it into my Kafka Stream App.
Serde
Finally, you are able to use this Serde class into your Kafka Stream App with the next line:
This is actually working with the latest Kafka version available at this moment which is 1.0.0!
If you call
Serdes.serdeFrom(...)
you get aWrappedSerde
type back that is for internal usage (andWrappedSerde
does not have an non-argument constructor). There is currently no API you can call to get a customSerde
. Instead, you need to implement you ownSerde
class and wrap you serializer and deserializer "manually".In your
Properties
you can set:Another way is using
StreamsBuilder
instead ofKStreamBuilder
.KStreamBuilder
is deprecated in 1.0.0. You can directly pass serde object usingConsumed.with
while creating stream. You need not to create customSerde
class in this scenario.You can keep
StringSerde
in below code instead of usingmessageSerde.getClass()
which is failing becausemessageSerde
is just aWrappedSerde
that does not have non-argument constructor.