How to listen to multiple queues with autowired Sp

2019-05-14 23:32发布

I'm new to Spring boot and I'm playing around with it. Currently I've build some apllications that I want to be able to communicate with each other through queues. I currently have a Listener object that can receive message from a particular queue.

@Configuration
public class Listener {

    final static String queueName = "myqueue";

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    Receiver receiver() {
        return new Receiver();
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
}

This works. However, now I want to be able to listen to another queue. So I figured I'd copy the above object and change the queue name. Unfortunately this did not work as Spring boot only creates a connection for one of them. Any ideas on how I can have my Spring Boot application listen to multiple queues?

3条回答
姐就是有狂的资本
2楼-- · 2019-05-15 00:00

You can try this

In application.properties

rabbitmq.queue.names= com.queue1,com.queue2

In Java file

@RabbitListener(queues = "#{'${rabbitmq.queue.names}'.split(',')}")
public void receiveMessage(Message message) {
    try {
        if (processmessage(message)); 
        }
    } catch (Exception ex) {
        LOGGER.error("Exception while processing the Message", ex);
    }

}
查看更多
够拽才男人
3楼-- · 2019-05-15 00:03

Here's what worked for me in groovy:

@Component
@EnableRabbit
@Slf4j
class StatusListener {
    Library library
    int messageCounter

    @Autowired
    StatusListener(Library library) {
        this.library = library
    }

    @RabbitListener(queues = '#{library.allStatusQueues.split(",")}')
    void receiveMessage(Message message) {
        messageCounter++
        log.info("Rabbit Listener received message <" + new String(message.body) + "> (" + messageCounter + ")")
    }
}

Where Library is a configuration bean:

@Component
@ConfigurationProperties
@RefreshScope
class Library {
    String allStatusQueues
}

The property itself, in application.properties or similar config file looks like:

all-status-queues=queue1,queue2,queue3,queue4
查看更多
Viruses.
4楼-- · 2019-05-15 00:11

Ok, I figured out how to get it to listen to multiple queues. Think there might be some downsides compared to my other solution, mainly that it doesn't work if the queue listed does not exist. I ended up using a totally different approach using a @RabbitListener

@Component
public class EventListener {

    private static Logger LOG = LoggerFactory.getLogger(EventListener.class);
    private CountDownLatch latch = new CountDownLatch(1);

    @RabbitListener(queues = "myqueue")
    public void processPaymentMessage(Object message) {
        LOG.info("Message is of type: " + message.getClass().getName());
        if(!(message instanceof byte[])) message = ((Message) message).getBody();
        String content = new String((byte[])message, StandardCharsets.UTF_8);
        LOG.info("Received on myqueue: " + content);
        latch.countDown();
    }

    @RabbitListener(queues = "myotherqueue")
    public void processOrderMessage(Object message) {
        LOG.info("Message is of type: " + message.getClass().getName());
        if(!(message instanceof byte[])) message = ((Message) message).getBody();
        String content = new String((byte[])message, StandardCharsets.UTF_8);           
        LOG.info("Received on myotherqueue: " + content);
        latch.countDown();
    }   
}

The whole check on byte[] is in there because that what a message send from the commandline looks like. Otherwise it's a org.springframework.amqp.core.Message.

查看更多
登录 后发表回答