I am trying the implement a dead letter queue using ActiveMQ. Unfortunately the documentation on this end is rather vague on some aspects and I can't seem to get everything properly set up.
I have the following Beans configured:
@Bean
public JmsTemplate createJMSTemplate() {
logger.info("createJMSTemplate");
JmsTemplate jmsTemplate = new JmsTemplate(getActiveMQConnectionFactory());
jmsTemplate.setDefaultDestinationName(queue);
jmsTemplate.setDeliveryPersistent(true);
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
return jmsTemplate;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(getActiveMQConnectionFactory());
factory.setConcurrency("1-10");
factory.setSessionTransacted(false);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
@Bean
public ConnectionFactory getActiveMQConnectionFactory() {
// Configure the ActiveMQConnectionFactory
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL("tcp://127.0.0.1:61616");
activeMQConnectionFactory.setTrustedPackages(Arrays.asList("com.company"));
// Configure the redeliver policy and the dead letter queue
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(0);
redeliveryPolicy.setRedeliveryDelay(10000);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(3);
RedeliveryPolicyMap redeliveryPolicyMap = activeMQConnectionFactory.getRedeliveryPolicyMap();
redeliveryPolicyMap.put(new ActiveMQQueue(queue), redeliveryPolicy);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
return activeMQConnectionFactory;
}
And this is my receiving code:
@Autowired
private ConnectionFactory connectionFactory;
private static Logger logger = LoggerFactory.getLogger(QueueReceiver.class);
private Connection connection;
private Session session;
private SegmentReceiver callback;
@PostConstruct
private void init() throws JMSException, InterruptedException {
logger.info("Initializing QueueReceiver...");
this.connection = connectionFactory.createConnection();
this.session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue q = session.createQueue(queue);
logger.info("Creating consumer for queue '{}'", q.getQueueName());
MessageConsumer consumer = session.createConsumer(q);
this.callback = new SegmentReceiver();
consumer.setMessageListener(callback);
this.connection.start();
}
@PreDestroy
private void destroy() throws JMSException {
logger.info("Destroying QueueReceiver...");
this.session.close();
this.connection.close();
}
private class SegmentReceiver implements MessageListener {
@Override
public void onMessage(Message message) {
logger.info("onMessage");
try {
TextMessage textMessage = (TextMessage) message;
Segment segment = Segment.fromJSON(textMessage.getText());
if (segment.shouldFail()) {
throw new IOException("This segment is expected to fail");
}
System.out.println(segment.getText());
message.acknowledge();
}
catch(IOException | JMSException exception) {
logger.error(exception.toString());
try {
QueueReceiver.this.session.rollback();
} catch (JMSException e) {
logger.error(e.toString());
}
throw new RuntimeException(exception);
}
}
}
However, nothing happens. I am using an out-of-the-box Apache ActiveMQ 5.14.2 setup using the default configuration. What am I missing here?
So it proved to be a combination of issues:
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE
session.recover()
rather thanrollback()
The ActiveMQ broker wasn't properly configured. I needed to add this bit to the activemq.xml configuration file (place it under the
<broker>
tag).Make sure you didn't active any
redeliveryPlugin
that can mess with your ActiveMQConnectionFactory configuration.because you are using
this.session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
calling
message.acknowledge();
is the same than callingsession.acknowledge();
.to have ActiveMQ redelivery working successfully with your config, there is some possibilities with minimal changes:
QueueReceiver.this.session.recover();
in place of calling
QueueReceiver.this.session.rollback();
use
this.session = connection.createSession(false, org.apache.activemq.ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
and call((org.apache.activemq.command.ActiveMQMessage) message ).acknowledge();
, note that not calling this method is like a rollback, means the message is not acknowledged and throwing an exception inonMessage
method will callQueueReceiver.this.consumer.rollback();
of org.apache.activemq.ActiveMQMessageConsumer.rollback().simply calling
QueueReceiver.this.consumer.rollback();
org.apache.activemq.ActiveMQMessageConsumer.rollback() in place of callingQueueReceiver.this.session.rollback();