How to use interactive query within kafka process

2019-08-23 05:32发布

问题:

Is it possible to use interactive query (InteractiveQueryService) within Spring Cloud Stream the class with @EnableBinding annotation or within the method with @StreamListener? I tried instantiating ReadOnlyKeyValueStore within provided KStreamMusicSampleApplication class and process method but its always null.

My @StreamListener method is listening to a bunch of KTables and KStreams and during the process topology e.g filtering, I have to check whether the key from a KStream already exists in a particular KTable.

I tried to figure out how to scan an incoming KTable to check if a key already exists but no luck. Then I came across InteractiveQueryService whose get() method could be used to check if a key exists inside a state store materializedAs from a KTable. The problem is that I can't access it from with the process topology (@EnableBinding or @StreamListener). It can only be accessed from outside these annotation e.g RestController.

Is there a way to scan an incoming KTable to check for the existence of a key or value? if not then can we access InteractiveQueryService within the process topology?

回答1:

InteractiveQueryService in Spring Cloud Stream is not available to be used within the actual topology in your StreamListener. As you mentioned, it is supposed to be used outside of your main topology. However, with the use case you described, you still can use the state store from your main flow. For example, if you have an incoming KStream and a KTable which is materialized as a state store, then you can call process on the KStream and access the state store that way. Here is a rough code to achieve that. You need to convert this to fit into your specific use case, but here is the idea.

ReadOnlyKeyValueStore<Object, String> store;

 input.process(() -> new Processor<Object, Product>() {

                @Override
                public void init(ProcessorContext processorContext) {
                    store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");


                }

                @Override
                public void process(Object key, Object value) {
                    //find the key
                    store.get(key);
                }

                @Override
                public void close() {
                    if (state != null) {
                        state.close();
                    }
                }
            }, "my-store");