How to filter keys and value with a Processor usin

2019-01-20 12:22发布

问题:

I have a Processor that interact with a StateStore to filter and do complex logic on the messages. In the process(key,value) method I use context.forward(key,value) to send the keys and values that I need. For debugging purposes I also print those.

I have a KStream mergedStream that results from a join of two other streams. I want to apply the processor to the records of that stream. I achieve this with : mergedStream.process(myprocessor,"stateStoreName")

When I start this program, I can see the proper values to be printed to my console. However if I send the mergedStream to a topic using mergedStream.to("topic") the values on the topic are not the one I have forwarded in the processor, but the original ones.

I use kafka-streams 0.10.1.0.

What is the best way to get the values I have forwarded in the processor to another stream ?

Is it possible to mix the Processor API with the streams created by the KStream DSL?

回答1:

Short:

To solve your problem you can use transform(...) instead of process(...) which gives you access to Processor API within DSL, too.

Long:

If you use process(...) you apply a processor to a stream -- however, this is a "terminating" (or sink) operation (its return type is void), i.e., it does not return any result (here "sink" does only mean that the operator has no successor -- it does not imply that any result is written somewhere!)

Furthermore, if you call mergedStream.process(...) and mergedStream.to(...) you basically branch-and-duplicate your stream and send one copy to each downstream operator (ie, one copy to process and one copy to to.

Mixing DSL and Processor API is absolutely possible (you did it already ;)). However, using process(...) you cannot consumer data you forward(...) within DSL -- if you want to consume Processor API result, you can use transform(...) instead of process(...).