ActiveMQ: how to dequeue older messages?

2019-05-06 16:54发布

问题:

I'm learning how to use ActiveMQ and now we are facing the following problem.

Suppose that I have a topic named topic.test on ActiveMQ which have two subscribers. In a given moment, I have only one of those subscribers waiting for messages, and a producer send a message for the topic I mentioned above.

Ok, the connected subscriber get the message, but shouldn't the other subscriber receive that message later when it is connected? Well, in my case it's not happening: my subscribers are only receiving messages while connected. All the other messages, which were sent while they were not connected are not being received by them. What could I be doing wrong?

Here is some of the source code I wrote to test ActiveMQ. Maybe you could find what is wrong with it.

My consummer code:

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        connection.setClientID("leitorTeste");
        conexao.start();
        Session sessao = conexao.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic fonte = sessao.createTopic("topic.test");
        MessageConsumer consumer = sessao.createConsumer(fonte);
        javax.jms.Message presente = null;
        while ((presente = consumer.receive()) != null) {
            System.out.println(((TextMessage) presente).getText());
        }
        consumer.setMessageListener(new LeitorMensagens());
        conexao.close();

And here is my producer code:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        Session sessao = conexao.createSession(true, Session.AUTO_ACKNOWLEDGE);
        connection.start();
        Destination destino = sessao.createTopic("topic.test");
        MessageProducer produtorMensagem = sessao.createProducer(destino);
        produtorMensagem.setDeliveryMode(DeliveryMode.PERSISTENT);
        TextMessage message = sessao.createTextMessage("Hi!");
        produtorMensagem.send(message);
        sessao.commit();
        connection.close();

Is there any other configuration I should add to ActiveMQ so that my consumers could get older messages?

回答1:

You must make your consumers "permanent". Otherwise, AMQ "forgets" about them as soon as they unsubscribe. To do this, use Session.createDurableSubscriber()



回答2:

There is something called a retroactive consumer policy you can also set on the broker. This is for Topic Subscribers - which aren't durable, but may wish to receive 'recent' messages they may have missed - see also Subscription Recovery Policy