How to register a stateless processor (that seems

2019-03-29 09:29发布

问题:

I'm building a topology and want to use KStream.process() to write some intermediate values to a database. This step doesn't change the nature of the data and is completely stateless.

Adding a Processor requires to create a ProcessorSupplier and pass this instance to the KStream.process() function along with the name of a state store. This is what I don't understand.

How to add a StateStore object to a topology since it requires a StateStoreSupplier?

Failing to add a said StateStore gives this error when the application is started:

Exception in thread "main" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: StateStore my-state-store is not added yet.

Why is it necessary for a processor to have a state store? It seems that this could well be optional for processors that are stateless and don't maintain state.

Process all elements in this stream, one element at a time, by applying a Processor.

回答1:

Here's a simple example on how to use state stores, taken from the Confluent Platform documentation on Kafka Streams.

Step 1: Defining the StateStore/StateStoreSupplier:

StateStoreSupplier countStore = Stores.create("Counts")
                                      .withKeys(Serdes.String())
                                      .withValues(Serdes.Long())
                                      .persistent()
                                      .build();
  1. I don't see a way to add a StateStore object to my topology. It requires a StateStoreSupplier as well though.

Step 2: Adding the state store to your topology.

Option A - When using the Processor API:

TopologyBuilder builder = new TopologyBuilder();

// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
       .addProcessor("Process", () -> new WordCountProcessor(), "Source")
       // Add the countStore associated with the WordCountProcessor processor
       .addStateStore(countStore, "Process")
       .addSink("Sink", "sink-topic", "Process");

Option B - When using the Kafka Streams DSL:

Here you need to call KStreamBuilder#addStateStore("name-of-your-store") to add the state store to your processor topology. Then, when calling methods such as KStream#process() or KStream#transform(), you must also pass in the name of the state store -- otherwise your application will fail at runtime.

At the example of KStream#transform():

KStreamBuilder builder = new KStreamBuilder();

// Add the countStore that will be used within the Transformer[Supplier]
// that we pass into `transform()` below.
builder.addStateStore(countStore);

KStream<byte[], String> input = builder.stream("source-topic");

KStream<String, Long> transformed =
    input.transform(/* your TransformerSupplier */, countStore.name());

Why is it necessary for a processor to have a state store? It seems that this could well be optional for processors that are stateless and don't maintain state.

You are right -- you don't need a state store if your processor does not maintain state.

When using the DSL, you need to simply call KStreamBuilder#addStateStore("name-of-your-store") to add the state store to your processor topology and reference it later on.