JEE7 + WildFly (HornetQ) - Pause queue from applic

2020-07-23 09:10发布

问题:

We are using WildFly + HornetQ as our application server and JMS message queue, and have the requirement to be able to pause/resume queues from the application. Is this possible?

回答1:

This can be done using JMX or using the hornetq core management api.

For the purposes of this example, wildfly 8.1.0.Final was used running the standalone-full-ha profile.

Required Maven Dependencies:

    <dependency>
        <groupId>org.hornetq</groupId>
        <artifactId>hornetq-jms-client</artifactId>
        <version>2.4.1.Final</version>
    </dependency>

    <dependency>
        <groupId>org.wildfly</groupId>
        <artifactId>wildfly-jmx</artifactId>
        <version>8.1.0.Final</version>
    </dependency>

Here is a test class demonstrating the use of JmsQueueControl via JMX:

package test.jmx.hornetq;

import org.hornetq.api.jms.management.JMSQueueControl;

import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

public class WildflyJmsControl {

    public static void main(String[] args) {
        try {
            //Get a connection to the WildFly 8 MBean server on localhost
            String host = "localhost";
            int port = 9990;  // management-web port
            String urlString = System.getProperty("jmx.service.url","service:jmx:http-remoting-jmx://" + host + ":" + port);
            JMXServiceURL serviceURL = new JMXServiceURL(urlString);
            JMXConnector jmxConnector = JMXConnectorFactory.connect(serviceURL, null);
            MBeanServerConnection connection = jmxConnector.getMBeanServerConnection();

            String queueName = "testQueue"; // use your queue name here

            String mbeanObjectName = "jboss.as:subsystem=messaging,hornetq-server=default,jms-queue=" + queueName;
            ObjectName objectName = ObjectName.getInstance(mbeanObjectName);

            JMSQueueControl jmsQueueControl = (JMSQueueControl) MBeanServerInvocationHandler.newProxyInstance(connection, objectName, JMSQueueControl.class, false);
            assert jmsQueueControl != null;

            long msgCount = jmsQueueControl.countMessages(null);

            System.out.println(mbeanObjectName + " message count: " + msgCount);

            jmsQueueControl.pause();
            System.out.println("queue paused");

            jmsQueueControl.resume();
            System.out.println("queue resumed");

            jmxConnector.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

To access hornetq management via JMS use:

package test.jms.hornetq;

import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;

public class HornetqService {

    public void testPauseResumeQueue() {
        // this class needs to run in the same jvm as the wildfly server (i.e. not a remote jvm)
        try {
            ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(
                    InVMConnectorFactory.class.getName()));

            ClientSession session = locator.createSessionFactory().createSession();

            session.start();

            ClientRequestor requester = new ClientRequestor(session, "jms.queue.hornetq.management");

            String queueName = "testQueue"; // use your queue name here

            // get queue message count
            ClientMessage message = session.createMessage(false);
            ManagementHelper.putAttribute(message, queueName, "messageCount");

            ClientMessage reply = requester.request(message);
            int count = (Integer) ManagementHelper.getResult(reply);
            System.out.println("There are " + count + " messages in exampleQueue");

            // pause the queue
            message = session.createMessage(false);
            ManagementHelper.putOperationInvocation(message, queueName, "pause");

            requester.request(message);

            // get queue paused
            message = session.createMessage(false);
            ManagementHelper.putAttribute(message, queueName, "paused");
            reply = requester.request(message);
            Object result =  ManagementHelper.getResult(reply);
            System.out.println("result: " + result.getClass().getName() + " : " + result.toString());

            // resume queue
            message = session.createMessage(false);
            ManagementHelper.putOperationInvocation(message, queueName, "resume");
            requester.request(message);

            // get queue paused
            message = session.createMessage(false);
            ManagementHelper.putAttribute(message, queueName, "paused");
            reply = requester.request(message);
            Object result2 =  ManagementHelper.getResult(reply);
            System.out.println("result2: " + result2.getClass().getName() + " : " + result2.toString());

            requester.close();

            session.close();
        }catch (Exception e){
            System.out.println("Error pausing queue" + e.getMessage());
        }
    }
}


回答2:

Are you looking for way to stop and and start delivery of messages? If so, then JMS defines connection.Stop method to pause delivery of messages. Message delivery can be resumed with connection.Start method.

So HornetQ JMS client would have these methods implemented. You will need to use these methods.