KTable unable fetch data from Materialized view

2019-06-10 21:49发布

问题:

I am using Kafka Streams with Spring Boot. In my use case when I receive customer event I need to store it in customer-store materialized view and when I receive order event, I need to join customer and order then store the result in customer-order materialized view.

StoreBuilder customerStateStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("customer-store"),Serdes.String(), customerSerde)
                .withLoggingEnabled(new HashMap<>());
streamsBuilder.stream("customer", Consumed.with(Serdes.String(), customerSerde)).to("customer-to-ktable-topic",Produced.with(Serdes.String(), customerSerde));
KTable<String, Customer> customerKTable = streamsBuilder.table("customer-to-ktable-topic", Consumed.with(Serdes.String(), customerSerde),Materialized.as(customerStateStore.name()));

Here is the problem, when I receive Order event and my customerKTable returns null and join operation becomes useless. This is not how it supposed to work. My code is similar to Kafka Music example, I created TestConsumer class to test this. Code uploaded to Github for reference.

回答1:

This issue was created by KTable. The KTable syntax I was using was syntactically correct but not working. Refer this question for more information. Changing KTable syntax worked for me. Now, customerKTable returns events or objects from materialized view when Order event arrived.