I am new to kafka and tried to create a Spring boot app that takes a rest request and posts it to Kafka Queue. I posted the code snippets as I have. The request is received and then sent to the testlogchannel
. I was anticipating based on the configuration that I have a JSON that will be posted to Kafka topic. I have a process consuming the messages using the command bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testlog --from-beginning
. I notice that the messages never go to Kafka topic but are ending up in a test binder of type TestSupportBinder
. Surprisingly the kafka binder is never used even though I have configuration setup in application.yml
and pom.xml
. I use Java
/Annotation
based configuration and do not use xml.
Can someone point to what I am missing here? How can I have the channel dispatch messages to kafka and use kafka binder?
My application.yml snippet is as below
spring:
cloud:
stream:
default-binder: kafka
kafka:
binder:
brokers: localhost
zk-nodes: localhost
bindings:
testlogchannel:
destination: testlog
Producer Channel class:
public interface TestLogProducerChannel{
@Output("testlogchannel")
MessageChannel testLogWrite();
}
Message sending component:
@Component
public class MessageProducerUtil {
private static final Logger LOG = LoggerFactory.getLogger(MessageProducerUtil.class);
private final MessageChannel consumer;
public MessageProducerUtil(TestLogProducerChannel channel) {
this.consumer = channel.testLogWrite();
}
public void sendMessage(TestLog log) {
Message<TestLog> msg = MessageBuilder.withPayload(log).build();
LOG.debug("Sending log object" + log);
this.consumer.send(msg);
}
}
My maven dependencies are as under:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-loader-tools</artifactId>
</dependency>
<!-- Kafka support -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
My Rest controller:
@RestController
@RequestMapping("/api/app/v1.0")
public class TestLogResource {
private static final Logger LOGGER = LoggerFactory.getLogger(TestLogResource.class);
@Autowired
private MessageProducerUtil producer;
@PostMapping("testlog")
public ResponseEntity<TestLog> postTestLog(@RequestBody(required = true) TestLog testLog) {
producer.sendMessage(testLog);
return new ResponseEntity<>(testLog, HttpStatus.OK);
}
}