JMS MessageConsumer Using MessageListener Terminat

2019-05-31 22:14发布

Trying to have a JMS MessageConsumer survive ActiveMQ reboots, so it can reconnect using the Failover Transport protocol.

However, it terminates upon shutdown of ActiveMQ.

This looks like a bug that was reported and "resolved", but I'm still seeing this in the latest version of ActiveMQ 5.10.0

I used the following maven dependency

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.10.0</version>
    </dependency>

Here is some sample code using

public class SimpleConsumer {

public static void main(String[] args) throws Exception {
    String url = "failover:(tcp://ACTIVE_MQ_HOST:61616)";
    String destination = "test-topic";

    TopicConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
            url);

    ActiveMQConnection connection = (ActiveMQConnection) connectionFactory
            .createConnection();

    Session session = connection.createSession(false,
            Session.AUTO_ACKNOWLEDGE);

    Topic topic = session.createTopic(destination);

    MessageConsumer consumer = session.createConsumer(topic);
    connection.start();

// Uncomment these lines and comment out the lines below and it will work
//      while (true) {
//          Message msg = consumer.receive();
//          if (msg instanceof TextMessage) {
//              System.out.println("msg received = " + msg);
//          }
//      }

    consumer.setMessageListener(new MessageListener() {

        public void onMessage(Message msg) {
            System.out.println("msg received = " + msg);
        }

    });

}

}

I would like it to work with MessageListener is it is non-blocking and asynchronous.

Any help with this is greatly appreciated.

Somethings I have already tried as suggested by the JIRA reported above is to run this in non-daemon thread, but that didn't work.

I tried this

public class SimpleConsumerThread {

    public static void main(String[] args) throws Exception {


        Thread t = new Thread() {
            public void run() {
                try {               
                    String url = "failover:(tcp://ACTIVEMQ_HOST:61616)";
                    String destination = "test-topic";

                    TopicConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

                    ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();

                    Session session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);

                    Topic topic = session.createTopic(destination);

                    MessageConsumer consumer = session.createConsumer(topic);
                    connection.start();
                    consumer.setMessageListener(new MessageListener() {

                        public void onMessage(Message msg) {
                            System.out.println("msg received = " + msg);
                        }

                    });
                } catch (JMSException e) {
                    e.printStackTrace();
                }               
            }
        };
        t.setDaemon(false);

        t.start();

    }

}

2条回答
Rolldiameter
2楼-- · 2019-05-31 22:35

The reason your threaded solution doesn't work is because the thread terminates after the run() method completes and then you don't have a non-daemon thread running as before. Its not really a good idea to depend on the internal threading model of a third party library to keep your application running.

The best solution which will work regardless of bugs or other configuration intricacies in the ActiveMQ client is to use the while(true) sleep() paradigm to keep you main thread alive.

查看更多
叛逆
3楼-- · 2019-05-31 22:38

Thanks Tim,

Yes that worked. I just added, to keep at least one user thread alive so the program doesnt terminate.

    while(true) {
        Thread.sleep(1000);
    }

cheers,

public class SimpleConsumer {

    static Logger logger = Logger.getLogger(SimpleConsumer.class);

    public static void main(String[] args) throws Exception {
        String url = "failover:(tcp://sydapp057lx.fxdms.net:61615)";
        String destination = "test-topic";

        TopicConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                url);

        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory
                .createConnection();

        connection.setExceptionListener(new ExceptionListener() { 
            public void onException(JMSException e) {
                logger.debug("got exception = " + e);
            } 
        });

        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic(destination);

        MessageConsumer consumer = session.createConsumer(topic);
        connection.start();

        consumer.setMessageListener(new MessageListener() {

            public void onMessage(Message msg) {
                logger.debug("msg received = " + msg);
            }

        });

        while(true) {
            Thread.sleep(1000);
        }

    }


}
查看更多
登录 后发表回答