I have a Processor that interact with a StateStore to filter and do complex logic on the messages. In the process(key,value)
method I use context.forward(key,value)
to send the keys and values that I need. For debugging purposes I also print those.
I have a KStream mergedStream
that results from a join of two other streams. I want to apply the processor to the records of that stream. I achieve this with : mergedStream.process(myprocessor,"stateStoreName")
When I start this program, I can see the proper values to be printed to my console. However if I send the mergedStream to a topic using mergedStream.to("topic")
the values on the topic are not the one I have forwarded in the processor, but the original ones.
I use kafka-streams 0.10.1.0.
What is the best way to get the values I have forwarded in the processor to another stream ?
Is it possible to mix the Processor API with the streams created by the KStream DSL?