How can I have Spring integration channel dispatch

2019-07-26 01:52发布

问题:

I am new to kafka and tried to create a Spring boot app that takes a rest request and posts it to Kafka Queue. I posted the code snippets as I have. The request is received and then sent to the testlogchannel. I was anticipating based on the configuration that I have a JSON that will be posted to Kafka topic. I have a process consuming the messages using the command bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testlog --from-beginning. I notice that the messages never go to Kafka topic but are ending up in a test binder of type TestSupportBinder. Surprisingly the kafka binder is never used even though I have configuration setup in application.yml and pom.xml. I use Java/Annotation based configuration and do not use xml.

Can someone point to what I am missing here? How can I have the channel dispatch messages to kafka and use kafka binder?

My application.yml snippet is as below

spring:
    cloud:
        stream:
            default-binder: kafka
            kafka:
                binder:
                    brokers: localhost
                    zk-nodes: localhost
                bindings:
                    testlogchannel:
                        destination: testlog

Producer Channel class:

public interface TestLogProducerChannel{

    @Output("testlogchannel")
    MessageChannel testLogWrite();
}

Message sending component:

@Component
public class MessageProducerUtil {

    private static final Logger LOG = LoggerFactory.getLogger(MessageProducerUtil.class);

    private final MessageChannel consumer;

    public MessageProducerUtil(TestLogProducerChannel channel) {
        this.consumer = channel.testLogWrite();
    }

    public void sendMessage(TestLog log) {
        Message<TestLog> msg = MessageBuilder.withPayload(log).build();
        LOG.debug("Sending log object" + log);
        this.consumer.send(msg);
    }

}

My maven dependencies are as under:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-autoconfigure</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-loader-tools</artifactId>
    </dependency>
    <!-- Kafka support -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-test-support</artifactId>
        <scope>test</scope>
    </dependency>

My Rest controller:

@RestController
@RequestMapping("/api/app/v1.0")
public class TestLogResource {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestLogResource.class);

    @Autowired
    private MessageProducerUtil producer;
    @PostMapping("testlog")
    public ResponseEntity<TestLog> postTestLog(@RequestBody(required = true) TestLog testLog) {
        producer.sendMessage(testLog);
        return new ResponseEntity<>(testLog, HttpStatus.OK);
    }
}