I have a topic with multiple partitions in my stream processor i just wanted to stream that from one partition, and could nto figure out how to configure this
spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=s-processor
spring.cloud.stream.bindings.input.destination=uinput
spring.cloud.stream.bindings.input.group=r-processor
spring.cloud.stream.bindings.input.contentType=application/java-serialized-object
spring.cloud.stream.bindings.input.consumer.header-mode=raw
spring.cloud.stream.bindings.input.consumer.use-native-decoding=true
spring.cloud.stream.bindings.input.consumer.partitioned=true
@StreamListener(target = "input")
// @SendTo(value = { "uoutput" })
public void process(KStream<UUID, AModel> ustream) {
I want only one partition data to be processed by this processor, there will be other processors for other partition(s)
So far my finding is something to do with https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/StreamsConfig.html#PARTITION_GROUPER_CLASS_CONFIG, but couldnot find how to set this property in spring application.properties
I think the partition grouper is to group partition with tasks within a single processor. If you want to ensure that only a single partition is processed by a processor, then you need to provide at least the same number of processor instances as the topic partitions. For e.g. if your topic has 4 partitions, then you need to have 4 instances of the stream application to ensure that each instance is only processing a single partition.
Kafka Streams does not allow to read a single partition. If you subscribe to a topic, all partitions are consumed and distributed over the available instances. Thus, you can't know in advance, which partition is assigned to what instance, and all instances execute the same code.
But each partition linked to processor has different kind of data hence require different processor application
For this case, the processor (or transformer) must be able to process data for all partitions. Kafka Streams exposes the partitions number via the ProcessorContext
object that is handed to a processor via init()
method: https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html#init-org.apache.kafka.streams.processor.ProcessorContext-
Thus, you need to "branch" with within your transformer to apply different processing logic based on the partition:
ustream.transform(() -> new MyTransformer());
class MyTransformer implement Transformer {
// other methods omitted
R transform(K key, V value) {
switch(context.partition()) { // get context from `init()`
case 0:
// your processing logic
break;
case 1:
// your processing logic
break;
// ...
}
}