Registering a listener service with setMessageList

2019-08-20 07:35发布

问题:

I have a spring amqp app that i want to use but i can't seem to know how to get it to register my listener service. This is my annotation based configuration file.

package com.jerry.configuration;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class SpringAmqpConfiguration {

    protected final String helloWorldQueueName = "hello.world.queue";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        //The routing key is set to the name of the queue by the broker for the default exchange.
        template.setRoutingKey(this.helloWorldQueueName);
        //Where we will synchronously receive messages from
        template.setQueue(this.helloWorldQueueName);
        return template;
    }

    @Bean
    // Every queue is bound to the default direct exchange
    public Queue helloWorldQueue() {
        return new Queue(this.helloWorldQueueName);
    }

    @Bean 
    public Binding binding() {
        return declare(new Binding(helloWorldQueue(), defaultDirectExchange()));
    }

    //https://docs.spring.io/spring-amqp/docs/1.5.0.BUILD-SNAPSHOT/reference/html/_reference.html#async-annotation-driven

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(3);
        //If you want a fixed number of consumers, omit the max.
        factory.setMaxConcurrentConsumers(10);
        //Set message listener : container.setMessageListener(this.myFirstListener);
        return factory;
    }


    /**
    * Listener Bean
    * Threadpool 
    *     
    <bean id="messageListener" class="com.spring.rabbitmq.service.MessageListenerService" />

    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener ref="messageListener" queues="my.first.queue" />
    </rabbit:listener-container>

    */


}

This is the listener service i wish to use

package com.jerry.services;

import org.slf4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;


public class MessageListenerService implements MessageListener {

    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(MessageListenerService.class);

    @Override
    public void onMessage(Message message) {
        LOGGER.info(message.getMessageProperties().toString());;
        LOGGER.info(new String(message.getBody()));
    }
}

How do i register the listener service in this bean and how would i go about registering multiple listeners listening to different queues?

@Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(3);
        //If you want a fixed number of consumers, omit the max.
        factory.setMaxConcurrentConsumers(10);
        //Set message listener : container.setMessageListener(this.myFirstListener);
        return factory;
    }

回答1:

How do i register the listener service in this bean

Define a @Bean for MessageListenerService and create a SimpleMessageListenerContainer @Bean and use setMessageListener().

The container factory is not for user-defined listeners, it's for @RabbitListener pojo annotations.

and how would i go about registering multiple listeners listening to different queues?

If you want the same listener to listen to multiple queues, then set the queue names in the container.

If you want different listeners for each queue, you'll need a SimpleMessageListenerContainer for each.

Or, use @RabbitListener annotations instead.



回答2:

For your listener class you should do something like this:

@EnableRabbit
@Service
public class MessageListenerService {

   private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(MessageListenerService.class);

  @RabbitListener(queues = { Constant.QUEUE_NAME })
  public void onMessage(Message message) {
    LOGGER.info(message.getMessageProperties().toString());;
    LOGGER.info(new String(message.getBody()));
  }
}