Consume multiple topics in one listener in spring

2019-08-20 20:08发布

Can anyone provide me a small example in spring boot kafka where we can consume multiple topics in one single listener class.

2条回答
SAY GOODBYE
2楼-- · 2019-08-20 20:16

For consumers part of consumer group you can use following:

@KafkaListener(topics = "topic1,") public void listen(@Payload KafkaBinding record, MessageHeaders headers) throws ExecutionException, InterruptedException { ……… ……….. }

For consumers acting as assign you can use following:

@KafkaListener(id = “foo”,topicPartitions = { @TopicPartition(topic = “myTopic”,partitions = { “1” })}) public void listen(@Payload KafkaBinding record, MessageHeaders headers) throws ExecutionException, InterruptedException { ……… ……….. }

查看更多
走好不送
3楼-- · 2019-08-20 20:33

application.yml

my:
    kafka:
        conf:
            groupId: myId
            topics: topic1,topicN

you listener:

@KafkaListener(groupId = "${my.kafka.conf.groupId}", topics = "#{'${my.kafka.conf.topics}'.split(',')}")
public void storeTopicsDataToMongo(
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(required = false, name= KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
        @Payload(required = false) String record)
{
    log.trace(format("Received topic[%s] key[%s] payload[%s]", topic, key, record));
    //your code
}

or you can explore the @ConfigurationProperties and create the beans yourself, something like:

@Component
@ConfigurationProperties(prefix = "my.kafka.conf")
@Data //=> lombok
public class ConsumerConfigurationProperties {

    private String groupId;
    private List<String> topics;
}
查看更多
登录 后发表回答