I'm trying to set an Spring Boot Application listening to Kafka.
I'm using Kafka Streams Binder.
With one simple @EnableBinding
@EnableBinding(StreamExample.StreamProcessor.class)
public class StreamExample {
@StreamListener(StreamProcessor.INPUT)
@SendTo(StreamProcessor.OUTPUT)
public KStream<String, String> process(KStream<String, String> input) {
logger.info("Stream listening");
return input
.peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
}
interface StreamProcessor {
String INPUT = "input_1";
String OUTPUT = "output_1";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
}
}
and in application.yml
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: localhost:29092
bindings:
input_1:
destination: mytopic1
group: readgroup
output_1:
destination: mytopic2
input_2:
destination: mytopic3
group: readgroup
output_2:
destination: mytopic4
application:
name: stream_s1000_app
Everything works fine.
But if I try to add a second class with other binding, the following error occurs:
The following subscribed topics are not assigned to any members: [mytopic1]
Example of the second binding:
@EnableBinding(StreamExampleBindingTwo.StreamProcessor.class)
public class StreamExampleBindingTwo {
@StreamListener(StreamProcessor.INPUT)
@SendTo(StreamProcessor.OUTPUT)
public KStream<String, String> process(KStream<String, String> input) {
logger.info("Stream listening binding two");
return input
.peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
}
interface StreamProcessor {
String INPUT = "input_2";
String OUTPUT = "output_2";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
}
}
What I'm missing? Can't I use multiple input topics and multiple output in the same application? There is something related to application.name?