ActiveMQ redelivery does not work

2019-03-02 16:59发布

问题:

I am trying the implement a dead letter queue using ActiveMQ. Unfortunately the documentation on this end is rather vague on some aspects and I can't seem to get everything properly set up.

I have the following Beans configured:

@Bean
public JmsTemplate createJMSTemplate() {
    logger.info("createJMSTemplate");
    JmsTemplate jmsTemplate = new JmsTemplate(getActiveMQConnectionFactory());
    jmsTemplate.setDefaultDestinationName(queue);
    jmsTemplate.setDeliveryPersistent(true);
    jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
    return jmsTemplate;
}

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(getActiveMQConnectionFactory());
    factory.setConcurrency("1-10");
    factory.setSessionTransacted(false);
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    return factory;
}

@Bean
public ConnectionFactory getActiveMQConnectionFactory() {
    // Configure the ActiveMQConnectionFactory
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setBrokerURL("tcp://127.0.0.1:61616");
    activeMQConnectionFactory.setTrustedPackages(Arrays.asList("com.company"));

    // Configure the redeliver policy and the dead letter queue
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(0);
    redeliveryPolicy.setRedeliveryDelay(10000);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(3);
    RedeliveryPolicyMap redeliveryPolicyMap = activeMQConnectionFactory.getRedeliveryPolicyMap();
    redeliveryPolicyMap.put(new ActiveMQQueue(queue), redeliveryPolicy);
    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);

    return activeMQConnectionFactory;
}

And this is my receiving code:

@Autowired
private ConnectionFactory connectionFactory;

private static Logger logger = LoggerFactory.getLogger(QueueReceiver.class);
private Connection connection;
private Session session;
private SegmentReceiver callback;

@PostConstruct
private void init() throws JMSException, InterruptedException {
    logger.info("Initializing QueueReceiver...");
    this.connection = connectionFactory.createConnection();
    this.session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    Queue q = session.createQueue(queue);
    logger.info("Creating consumer for queue '{}'", q.getQueueName());
    MessageConsumer consumer = session.createConsumer(q);
    this.callback = new SegmentReceiver();
    consumer.setMessageListener(callback);
    this.connection.start();
}

@PreDestroy
private void destroy() throws JMSException {
    logger.info("Destroying QueueReceiver...");
    this.session.close();
    this.connection.close();
}

private class SegmentReceiver implements MessageListener {

    @Override
    public void onMessage(Message message) {
        logger.info("onMessage");
        try {
            TextMessage textMessage = (TextMessage) message;
            Segment segment = Segment.fromJSON(textMessage.getText());
            if (segment.shouldFail()) {
                throw new IOException("This segment is expected to fail");
            }
            System.out.println(segment.getText());
            message.acknowledge();
        }
        catch(IOException | JMSException exception) {
            logger.error(exception.toString());
            try {
                QueueReceiver.this.session.rollback();
            } catch (JMSException e) {
                logger.error(e.toString());
            }
            throw new RuntimeException(exception);
        }
    }

}

However, nothing happens. I am using an out-of-the-box Apache ActiveMQ 5.14.2 setup using the default configuration. What am I missing here?

回答1:

because you are using this.session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
calling message.acknowledge(); is the same than calling session.acknowledge(); .

to have ActiveMQ redelivery working successfully with your config, there is some possibilities with minimal changes:

  1. calling QueueReceiver.this.session.recover();
    in place of calling QueueReceiver.this.session.rollback();

void org.apache.activemq.ActiveMQSession.recover() throws JMSException

Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.

All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges all messages that have been delivered to the client.

Restarting a session causes it to take the following actions: •Stop message delivery •Mark all messages that might have been delivered but not acknowledged as "redelivered" •Restart the delivery sequence including all unacknowledged messages that had been previously delivered. Redelivered messages do not have to be delivered in exactly their original delivery order.

  1. use this.session = connection.createSession(false, org.apache.activemq.ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); and call ((org.apache.activemq.command.ActiveMQMessage) message ).acknowledge(); , note that not calling this method is like a rollback, means the message is not acknowledged and throwing an exception in onMessage method will call QueueReceiver.this.consumer.rollback(); of org.apache.activemq.ActiveMQMessageConsumer.rollback().

  2. simply calling QueueReceiver.this.consumer.rollback(); org.apache.activemq.ActiveMQMessageConsumer.rollback() in place of calling QueueReceiver.this.session.rollback();



回答2:

So it proved to be a combination of issues:

  • The session acknowledge mode needed to be set to ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE
  • I'm using session.recover() rather than rollback()
  • The ActiveMQ broker wasn't properly configured. I needed to add this bit to the activemq.xml configuration file (place it under the <broker> tag).

        <destinationPolicy>
        <policyMap>
          <policyEntries>
            <policyEntry topic=">" >
                <!-- The constantPendingMessageLimitStrategy is used to prevent
                     slow topic consumers to block producers and affect other consumers
                     by limiting the number of messages that are retained
                     For more information, see:
    
                     http://activemq.apache.org/slow-consumer-handling.html
    
                -->
              <pendingMessageLimitStrategy> 
                <constantPendingMessageLimitStrategy limit="1000"/>
              </pendingMessageLimitStrategy>
            </policyEntry>
            <!-- Set the following policy on all queues using the '>' wildcard -->
            <policyEntry queue=">">
                <deadLetterStrategy>
                    <!--
                      Use the prefix 'DLQ.' for the destination name, and make
                      the DLQ a queue rather than a topic
                    -->
                    <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
                </deadLetterStrategy>
            </policyEntry>
          </policyEntries>
        </policyMap>
    </destinationPolicy>
    
  • Make sure you didn't active any redeliveryPlugin that can mess with your ActiveMQConnectionFactory configuration.