Spring Kafka: Multiple Listeners for different obj

2020-07-05 06:13发布

Can I please check with the community what is the best way to listen to multiple topics, with each topic containing a message of a different class?

I've been playing around with Spring Kafka for the past couple of days. My thought process so far:

  • Because you need to pass your deserializer into DefaultKafkaConsumerFactory when initializing a KafkaListenerContainerFactory. This seems to indicate that if I need multiple containers each deserializing a message of a different type, I will not be able to use the @EnableKafka and @KafkaListener annotations.

  • This leads me to think that the only way to do so would be to instantiate multiple KafkaMessageListenerContainers.

  • And given that KafkaMessageListenerContainers is single threaded and I need to listen to multiple topics at the same time, I really should be using multiple ConcurrentKafkaMessageListenerContainers.

Would I be on the right track here? Is there a better way to do this?


Deceive 欺骗
2楼-- · 2020-07-05 06:15

Here is a very simple example.

// -----------------------------------------------
// Sender
// -----------------------------------------------

public class SenderConfig {
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;

    public ProducerFactory<String, Class1> producerFactory1() {
        return new DefaultKafkaProducerFactory<String, Class1>(producerConfigs());

    public KafkaTemplate<String, Class1> kafkaTemplate1() {
        return new KafkaTemplate<>(producerFactory1());

    public Sender1 sender1() {
        return new Sender1();

    //-------- send the second class --------

    public ProducerFactory<String, Class2> producerFactory2() {
        return new DefaultKafkaProducerFactory<String, Class2>(producerConfigs());

    public KafkaTemplate<String, Class2> kafkaTemplate2() {
        return new KafkaTemplate<>(producerFactory2());

    public Sender2 sender2() {
        return new Sender2();

public class Sender1 {
    private KafkaTemplate<String, Class1> kafkaTemplate1;

    public void send(String topic, Class1 c1) {
        kafkaTemplate1.send(topic, c1);

public class Sender2 {
    private KafkaTemplate<String, Class2> kafkaTemplate2;

    public void send(String topic, Class2 c2) {
        kafkaTemplate2.send(topic, c2);

// -----------------------------------------------
// Receiver
// -----------------------------------------------

public class ReceiverConfig {

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;

    public ConsumerFactory<String, Class1> consumerFactory1() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
            new JsonDeserializer<>(Class1.class));

    public ConcurrentKafkaListenerContainerFactory<String, Class1> kafkaListenerContainerFactory1() {
        ConcurrentKafkaListenerContainerFactory<String, Class1> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        return factory;

    public Receiver1 receiver1() {
        return new Receiver1();

    //-------- add the second listener

    public ConsumerFactory<String, Class2> consumerFactory2() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
            new JsonDeserializer<>(Class2.class));

    public ConcurrentKafkaListenerContainerFactory<String, Class2> kafkaListenerContainerFactory2() {
        ConcurrentKafkaListenerContainerFactory<String, Class2> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        return factory;

    public Receiver2 receiver2() {
        return new Receiver2();

public class Receiver1 {
    @KafkaListener(id="listener1", topics = "topic1", containerFactory = "kafkaListenerContainerFactory1")
    public void receive(Class1 c1) {
        LOGGER.info("Received c1");

public class Receiver2 {
    @KafkaListener(id="listener2", topics = "topic2", containerFactory = "kafkaListenerContainerFactory2")
    public void receive(Class2 c2) {
        LOGGER.info("Received c2");
3楼-- · 2020-07-05 06:26

I would like to use the code below to apply your sence

public class ConsumerConfig {

    private String bootstrapServers;

    private String groupId;

     * Configuration of Consumer properties.
     * @return
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        return props;

    public ConsumerFactory<String, ClassA> consumerFactory1() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new ClassA());

     * Kafka Listener Container Factory.
     * @return
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, ClassA>> kafkaListenerContainerFactory1() {
        ConcurrentKafkaListenerContainerFactory<String, ClassA> factory;
        factory = new ConcurrentKafkaListenerContainerFactory<>();
        return factory;

    public ConsumerFactory<String, ClassB> consumerFactory2() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new ClassB());

     * Kafka Listener Container Factory.
     * @return
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, ClassB>> kafkaListenerContainerFactory2() {
        ConcurrentKafkaListenerContainerFactory<String, ClassB> factory;
        factory = new ConcurrentKafkaListenerContainerFactory<>();
        return factory;

    public ReceiverClass receiver() {
        return new ReceiverClass();

    class ReceiverClass {
        @KafkaListener(topics = "topic1", group = "group-id-test",
                containerFactory = "kafkaListenerContainerFactory1")
        public void receiveTopic1(ClassA a) {
            System.out.println("ReceiverClass.receive() ClassA : " + a);

        @KafkaListener(topics = "topic2", group = "group-id-test",
                containerFactory = "kafkaListenerContainerFactory2")
        public void receiveTopic2(ClassB b) {
            System.out.println("ReceiverClass.receive() Classb : " + b);


    class ClassB  implements Deserializer  {

        public void configure(Map configs, boolean isKey) {
            // TODO Auto-generated method stub


        public Object deserialize(String topic, byte[] data) {
            // TODO Auto-generated method stub
            return null;

        public void close() {
            // TODO Auto-generated method stub



    class ClassA  implements Deserializer {

        public void configure(Map configs, boolean isKey) {
            // TODO Auto-generated method stub


        public Object deserialize(String topic, byte[] data) {
            // TODO Auto-generated method stub
            return null;

        public void close() {
            // TODO Auto-generated method stub


\"骚年 ilove
4楼-- · 2020-07-05 06:33

You can use the annotations, you would just need to use a different listener container factory for each.

The framework will create a listener container for each annotation.

You can also listen to multiple topics on a single-threaded container but they would be processed, er, on a single thread.

Take a look at the code from my SpringOne Platform talk last year - you might want to look at app6, which shows how to use a MessageConverter instead of a deserializer, which might help simplify your configuration.

登录 后发表回答