We are using WildFly + HornetQ as our application server and JMS message queue, and have the requirement to be able to pause/resume queues from the application. Is this possible?
可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
回答1:
This can be done using JMX or using the hornetq core management api.
For the purposes of this example, wildfly 8.1.0.Final was used running the standalone-full-ha profile.
Required Maven Dependencies:
<dependency>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-jms-client</artifactId>
<version>2.4.1.Final</version>
</dependency>
<dependency>
<groupId>org.wildfly</groupId>
<artifactId>wildfly-jmx</artifactId>
<version>8.1.0.Final</version>
</dependency>
Here is a test class demonstrating the use of JmsQueueControl via JMX:
package test.jmx.hornetq;
import org.hornetq.api.jms.management.JMSQueueControl;
import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
public class WildflyJmsControl {
public static void main(String[] args) {
try {
//Get a connection to the WildFly 8 MBean server on localhost
String host = "localhost";
int port = 9990; // management-web port
String urlString = System.getProperty("jmx.service.url","service:jmx:http-remoting-jmx://" + host + ":" + port);
JMXServiceURL serviceURL = new JMXServiceURL(urlString);
JMXConnector jmxConnector = JMXConnectorFactory.connect(serviceURL, null);
MBeanServerConnection connection = jmxConnector.getMBeanServerConnection();
String queueName = "testQueue"; // use your queue name here
String mbeanObjectName = "jboss.as:subsystem=messaging,hornetq-server=default,jms-queue=" + queueName;
ObjectName objectName = ObjectName.getInstance(mbeanObjectName);
JMSQueueControl jmsQueueControl = (JMSQueueControl) MBeanServerInvocationHandler.newProxyInstance(connection, objectName, JMSQueueControl.class, false);
assert jmsQueueControl != null;
long msgCount = jmsQueueControl.countMessages(null);
System.out.println(mbeanObjectName + " message count: " + msgCount);
jmsQueueControl.pause();
System.out.println("queue paused");
jmsQueueControl.resume();
System.out.println("queue resumed");
jmxConnector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
To access hornetq management via JMS use:
package test.jms.hornetq;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
public class HornetqService {
public void testPauseResumeQueue() {
// this class needs to run in the same jvm as the wildfly server (i.e. not a remote jvm)
try {
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(
InVMConnectorFactory.class.getName()));
ClientSession session = locator.createSessionFactory().createSession();
session.start();
ClientRequestor requester = new ClientRequestor(session, "jms.queue.hornetq.management");
String queueName = "testQueue"; // use your queue name here
// get queue message count
ClientMessage message = session.createMessage(false);
ManagementHelper.putAttribute(message, queueName, "messageCount");
ClientMessage reply = requester.request(message);
int count = (Integer) ManagementHelper.getResult(reply);
System.out.println("There are " + count + " messages in exampleQueue");
// pause the queue
message = session.createMessage(false);
ManagementHelper.putOperationInvocation(message, queueName, "pause");
requester.request(message);
// get queue paused
message = session.createMessage(false);
ManagementHelper.putAttribute(message, queueName, "paused");
reply = requester.request(message);
Object result = ManagementHelper.getResult(reply);
System.out.println("result: " + result.getClass().getName() + " : " + result.toString());
// resume queue
message = session.createMessage(false);
ManagementHelper.putOperationInvocation(message, queueName, "resume");
requester.request(message);
// get queue paused
message = session.createMessage(false);
ManagementHelper.putAttribute(message, queueName, "paused");
reply = requester.request(message);
Object result2 = ManagementHelper.getResult(reply);
System.out.println("result2: " + result2.getClass().getName() + " : " + result2.toString());
requester.close();
session.close();
}catch (Exception e){
System.out.println("Error pausing queue" + e.getMessage());
}
}
}
回答2:
Are you looking for way to stop and and start delivery of messages? If so, then JMS defines connection.Stop
method to pause delivery of messages. Message delivery can be resumed with connection.Start
method.
So HornetQ JMS client would have these methods implemented. You will need to use these methods.