Multiple listeners that are durable. Do they work

2019-08-19 03:53发布

问题:

My configuration:

@Bean
    public ActiveMQConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(DEFAULT_BROKER_URL);
        return connectionFactory;
    }
 @Bean
    public DefaultMessageListenerContainer listenerContainers() {
        DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        //container.setConnectionFactory(connectionFactory1());
        container.setClientId("consumer1");
        container.setDestinationName(COMMENT_QUEUE);
        container.setPubSubDomain(true);
        container.setSessionTransacted(true);
        container.setSubscriptionDurable(true);
        container.setMessageListener(datafileSubscriber);
        container.start();
        return container;
    }

    @Bean
    public DefaultMessageListenerContainer listenerContainers1() {
        DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setClientId("consumer2");
        container.setDestinationName(COMMENT_QUEUE);
        container.setPubSubDomain(true);
        container.setSessionTransacted(true);
        container.setSubscriptionDurable(true);
        container.setMessageListener(datafileSubscriber);
        container.start();
        return container;
    } 

I need the messages to be published to multiple listeners. All the listeners execute the same code. I want them to be durable. I have put setsessiontransacted true. It is a pub/sub model.

My idea is that if one listener will execute the code. Other listeners can send simply send an acknowledgement. This way they can get another message.

My assumption here: The broker sends a message to both the listeners. One of them acknowledges immediately, while the other processes it.
Now the broker got another message. Since the 1st listener didnt send an acknowledgement, it will send the message to the 2nd listener and it puts the message in a queue for 1st listener so that it can send whenever the 1st listener acknowledges the previous message.

My important doubt here: Does the activemq broker send another message without all the listeners acknowledging ?

I think the concept is that each listener will have a queue maintained in the broker. As the broker get messages, it will push messages into the queue of each individual listener. If the listener is idle, it will take the message. If it is busy processing, until the acknowledgement is sent, the message will stay in the queue. After the acknowledgement, next message will be delievered to the listener.

I am only saying this in context of properties I have, durable subscribers, setsession transacted true.

What I have tried and failed. I tried to set the concurrent consumer property to 2 and also set it to durable subscriber. Looks like if it is a durable subscriber, it needs a unique client ID. So I switched to using multiple containers with concurrent consumer property 1.

EDIT : Everything I said here is in the context of my configuration which is using durable subscribers, setsessiontransacted true and same message listener

回答1:

My assumption here: The broker sends a message to both the listeners. One of them acknowledges immediately, while the other processes it. Now the broker got another message. Since the 1st listener didnt send an acknowledgement, it will send the message to the 2nd listener and it puts the message in a queue for 1st listener so that it can send whenever the 1st listener acknowledges the previous message.

It doesn't work that way at all, the consumers/subscriptions are independent of each other. There is no "queue" for each user; just the topic; with durable subscriptions, the broker keeps track of the last message sent a consumer; when all durable subscriptions have received the message, it is deleted.

The actual process of sending the message to the consumer depends on other factors, for example, ActiveMQ supports prefetch (default 1000) which means it will send up to that number without waiting for acks.

You MUST use sessionTransacted with the DMLC so the ack is not committed until your listener completes.

concurrent consumer property to 2

As I said in the answer to your other question, increasing the concurrency makes no sense when consuming from a topic.