I am new to jms. The goal is to process messages concurrently from a queue in an asynchronous listener's onMessage method by attaching a listener instance to multiple consumer's with each consumer using its own session and running in a separate thread, that way the messages are passed on to the different consumers for concurrent processing.
1) Is it possible to process messages concurrently from a single queue by creating multiple consumers ? 2) I came up with the below code, but would like to get your thoughts on whether the below code looks correct for what I want to accomplish.
public class QueueConsumer implements Runnable, MessageListener {
public static void main(String[] args) {
QueueConsumer consumer1 = new QueueConsumer();
QueueConsumer consumer2 = new QueueConsumer();
try {
consumer1.init("oms", "US.Q.CHECKOUT-ORDER.1.0.JSON");
consumer2.init("oms","US.Q.CHECKOUT-ORDER.1.0.JSON");
} catch (JMSException ex) {
ex.printStackTrace();
System.exit(-1);
}
Thread newThread1 = new Thread(consumer1);
Thread newThread2 = new Thread(consumer1);
newThread1.start();
newThread2.start();
}
private static String connectionFactoryName = null;
private static String queueName = null;
private static ConnectionFactory qcf = null;
private static Connection queueConnection = null;
private Session ses = null;
private Destination queue = null;
private MessageConsumer msgConsumer = null;
public static final Logger logger = LoggerFactory
.getLogger(QueueConsumer.class);
public QueueConsumer() {
super();
}
public void onMessage(Message msg) {
if (msg instanceof TextMessage) {
try {
//process message
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
public void run() {
try {
queueConnection.start();
} catch (JMSException e) {
e.printStackTrace();
System.exit(-1);
}
while (!Thread.currentThread().isInterrupted()) {
synchronized (this) {
try {
wait();
} catch (InterruptedException ex) {
break;
}
}
}
}
public void init(String factoryName, String queue2) throws JMSException {
try {
qcf = new JMSConnectionFactory(factoryName);
queueConnection = qcf.createConnection();
ses = queueConnection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
queue = ses.createQueue(queue2);
logger.info("Subscribing to destination: " + queue2);
msgConsumer = ses.createConsumer(queue);
msgConsumer.setMessageListener(this);
System.out.println("Listening on queue " + queue2);
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
}
private static void setConnectionFactoryName(String name) {
connectionFactoryName = name;
}
private static String getQueueName() {
return queueName;
}
private static void setQueueName(String name) {
queueName = name;
}
}
I only took a brief look and I noticed that you pass the wrong consumer to your second thread:
beside of this, some variables such as
ConnectionFactory
are static and initialized multiple times/overriden. You only need one connection that could create multiple sessions and/or consumers.Related to the code example you provided, It is not recommanded by Oracle to create low-level threads on a deployed application. Example for Weblogic : Using Threads in WebLogic Server
Instead in the applicationcontext.xml, where you have made the bean of mail container, you can add concurrent consumer property which would be a better approach.