Trying to have a JMS MessageConsumer survive ActiveMQ reboots, so it can reconnect using the Failover Transport protocol.
However, it terminates upon shutdown of ActiveMQ.
This looks like a bug that was reported and "resolved", but I'm still seeing this in the latest version of ActiveMQ 5.10.0
I used the following maven dependency
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.10.0</version>
</dependency>
Here is some sample code using
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
String url = "failover:(tcp://ACTIVE_MQ_HOST:61616)";
String destination = "test-topic";
TopicConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
url);
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory
.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(destination);
MessageConsumer consumer = session.createConsumer(topic);
connection.start();
// Uncomment these lines and comment out the lines below and it will work
// while (true) {
// Message msg = consumer.receive();
// if (msg instanceof TextMessage) {
// System.out.println("msg received = " + msg);
// }
// }
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
System.out.println("msg received = " + msg);
}
});
}
}
I would like it to work with MessageListener is it is non-blocking and asynchronous.
Any help with this is greatly appreciated.
Somethings I have already tried as suggested by the JIRA reported above is to run this in non-daemon thread, but that didn't work.
I tried this
public class SimpleConsumerThread {
public static void main(String[] args) throws Exception {
Thread t = new Thread() {
public void run() {
try {
String url = "failover:(tcp://ACTIVEMQ_HOST:61616)";
String destination = "test-topic";
TopicConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(destination);
MessageConsumer consumer = session.createConsumer(topic);
connection.start();
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
System.out.println("msg received = " + msg);
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
};
t.setDaemon(false);
t.start();
}
}