Spring cloud stream and consume multiple kafka top

2019-06-28 02:36发布

问题:

I have an issue with spring-boot-stream during some attempts for consume multiple topics in one @StreamListener.

According to spring-cloud-stream docs: Accordidng to docs:

destination

The target destination of a channel on the bound middleware (e.g., the RabbitMQ exchange or Kafka topic). If the channel is bound as a consumer, it could be bound to multiple destinations and the destination names can be specified as comma separated String values. If not set, the channel name is used instead. The default value of this > property cannot be overridden.

But, after i had been using next config:

spring:
  cloud:
    stream:
      bindings:
        testchannel:
          group: test
          destination: platform.metrics, platform.sleuth

And now i had next error:

Caused by: java.lang.IllegalArgumentException: Topic name can only have ASCII alphanumerics, '.', '_' and '-'
    at org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils.validateTopicName(KafkaTopicUtils.java:39) ~[spring-cloud-stream-binder-kafka-core-1.2.1.RELEASE.jar:1.2.1.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:107) ~[spring-cloud-stream-binder-kafka-core-1.2.1.RELEASE.jar:1.2.1.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:60) ~[spring-cloud-stream-binder-kafka-core-1.2.1.RELEASE.jar:1.2.1.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:110) ~[spring-cloud-stream-1.2.2.RELEASE.jar:1.2.2.RELEASE]
    ... 20 common frames omitted

How to bind multiple topics to one @StreamListner or generate dynamic streamListeners from topic list?

回答1:

You simply need to replace the space between the comma and the next destination value and it will look like:

spring:
    cloud:
        stream:
            bindings:
                testchannel:
                    group: test
                    destination: platform.metrics,platform.sleuth


回答2:

Here I wrote more details on how to send messages to dynamic destinations and how to receive messages from dynamic destinations.

https://stackoverflow.com/a/56446574/4587961

As Varun Miglani said, topics are separated with comma without whitespaces.

spring:
  cloud:
    stream:
      default:
        consumer:
          concurrency: 2
          partitioned: true
      bindings:
        # inputs
        input:
          group: application_name_group
          destination: topic-1,topic-2
          content-type: application/json;charset=UTF-8

To send a message dynamically, use BinderAwareChannelResolver and dynamicDestinations property.

spring:
  cloud:
    stream:
      dynamicDestinations: output.topic.1,output.topic2,output.topic.3