Spring @StreamListener process(KStream<?,?>

2019-08-20 11:22发布

问题:

I have a topic with multiple partitions in my stream processor i just wanted to stream that from one partition, and could nto figure out how to configure this

spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=s-processor
spring.cloud.stream.bindings.input.destination=uinput
spring.cloud.stream.bindings.input.group=r-processor
spring.cloud.stream.bindings.input.contentType=application/java-serialized-object
spring.cloud.stream.bindings.input.consumer.header-mode=raw
spring.cloud.stream.bindings.input.consumer.use-native-decoding=true

spring.cloud.stream.bindings.input.consumer.partitioned=true

@StreamListener(target = "input")
// @SendTo(value = { "uoutput" })
public void process(KStream<UUID, AModel> ustream) {

I want only one partition data to be processed by this processor, there will be other processors for other partition(s)

So far my finding is something to do with https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/StreamsConfig.html#PARTITION_GROUPER_CLASS_CONFIG, but couldnot find how to set this property in spring application.properties

回答1:

I think the partition grouper is to group partition with tasks within a single processor. If you want to ensure that only a single partition is processed by a processor, then you need to provide at least the same number of processor instances as the topic partitions. For e.g. if your topic has 4 partitions, then you need to have 4 instances of the stream application to ensure that each instance is only processing a single partition.



回答2:

Kafka Streams does not allow to read a single partition. If you subscribe to a topic, all partitions are consumed and distributed over the available instances. Thus, you can't know in advance, which partition is assigned to what instance, and all instances execute the same code.

But each partition linked to processor has different kind of data hence require different processor application

For this case, the processor (or transformer) must be able to process data for all partitions. Kafka Streams exposes the partitions number via the ProcessorContext object that is handed to a processor via init() method: https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html#init-org.apache.kafka.streams.processor.ProcessorContext-

Thus, you need to "branch" with within your transformer to apply different processing logic based on the partition:

ustream.transform(() -> new MyTransformer());


class MyTransformer implement Transformer {
  // other methods omitted

  R transform(K key, V value) {
    switch(context.partition()) { // get context from `init()`
      case 0:
        // your processing logic
        break;
      case 1:
        // your processing logic
        break;

      // ...
  }
}