I need to create a state store with String key HashMap as value. I tried the below two methods.
// First method
StateStoreSupplier avgStoreNew = Stores.create("AvgsNew")
.withKeys(Serdes.String())
.withValues(HashMap.class)
.persistent()
.build();
// Second method
HashMap<String ,Double> h = new HashMap<String ,Double>();
StateStoreSupplier avgStore1 = Stores.create("Avgs")
.withKeys(Serdes.String())
.withValues(Serdes.serdeFrom(h.getClass()))
.persistent()
.build();
The code compiles fine without any error, but I get a runtime error
io.confluent.examples.streams.WordCountProcessorAPIException in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer
Can someone suggest me what is the correct way to create a state store?
If you want to create a state store, you need to provide a serializer and deserializer class for the type you want to use. In Kafka Stream, there is a single abstraction called Serde that wraps serializer and deserializer in a single class.
If you use
.withValues(Class<K> keyClass)
it must hold thatBecause there is no built-in
Serdes
forHashMap
you need to implement one first (maybe calledHashMapSerde
) and give this class to the method.withValues(Serde<K> keySerde)
. Furhtermore, you must implement the actual serializer and deserializer forHashMap
, too. If you know the generic types of your HashMap, you should specify them (what make the implementation of serializer and deserializer much simpler.Something like this (just a sketch; generic types omitted):
If you want to see examples for how to implement (de)serializers and
Serde
, have a look into https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/serialization and https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java