I use the following application to connect to my oracle database, register a queue listener and wait for enqueued messages:
package sample;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
public class MyThread extends Thread {
private static final String QUEUE_NAME = "MY_QUEUE";
private static final String QUEUE_USER = "myuser";
private static final String QUEUE_PW = "mypassword";
private boolean running;
public MyThread() {
this.running = true;
}
public static void main(String[] args) {
MyThread mt = new MyThread();
mt.start();
}
private QueueConnection getQueueConnection() throws JMSException {
QueueConnectionFactory QFac = AQjmsFactory.getQueueConnectionFactory("xxx.xxx.xxx.xxx", "orcl", 1521, "thin");
QueueConnection QCon = QFac.createQueueConnection(QUEUE_USER, QUEUE_PW);
return QCon;
}
@Override
public void interrupt() {
this.running = false;
super.interrupt();
}
@Override
public void run() {
try {
QueueConnection queueConnection = getQueueConnection();
QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = ((AQjmsSession) queueSession).getQueue(QUEUE_USER, QUEUE_NAME);
while (running) {
System.out.println("Starting...");
queueConnection.start();
MessageConsumer mq = ((AQjmsSession) queueSession).createReceiver(queue);
MyListener listener = new MyListener();
mq.setMessageListener(listener);
System.out.println("... Done, now sleep a bit and redo");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Closing Application");
queueSession.close();
queueConnection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Once a message got enqueued the onMessage function will append a message into a textfiles content:
package sample;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import javax.jms.Message;
import javax.jms.MessageListener;
public class MyListener implements MessageListener{
@Override
public void onMessage(Message arg0) {
try {
PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter("C:/temp/output/messages.txt", true)));
out.println("New Message arrived");
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
On runtime my console output looks like this:
Starting...
... Done, now sleep a bit an redo
Starting...
... Done, now sleep a bit an redo
Starting...
... Done, now sleep a bit an redo
Starting...
... Done, now sleep a bit an redo
If there where any enqueues the file will contain the new messages.
So: I can dequeue Events and the onMessage event will be triggered with this code.
Now to my question: I'm pretty sure waiting 5 seconds to register the Listener again (and call queueConnection.start() to receive the onMessage calls) is not the correct approach. But if I don't do this there won't be any onMessage events (File stays empty).
What is the correct approach to start listening to a queue infinitely without a fixed Thread.sleep() call and without the need to register the listener again even if there weren't any events?
Additional Information
Database: Oracle 11g2
Java Runtime: 1.6
Maven Dependencies:
- oracle-jdbc (11.2.0.4.0)
- xdb (1.0)
- aqapi (1.3)
- jmscommon (1.3.1_02)
There's no reason to run a thread to create a JMS consumer and set its message listener. The whole point of a JMS message listener is to receive message asynchronously (functionality which you appear to be trying to duplicate for some reason).
You simply need to create the JMS consumer and set the message listener and then ensure the consumer isn't closed. Depending on how the application is written it is sometimes necessary to have a
while
loop to make sure the program doesn't terminate and therefore close the consumer. Your thread isn't doing that. It's letting the consumer fall out of scope after waiting for messages for 5 seconds which means that it will be garbage collected and I expect for most JMS implementations that means it will be closed. It could be worse than that, though. By not explicitly closing the consumer and just letting it fall out of scope you could be leaking consumers which would eventually bog down your message broker. This is not only sloppy programming, but potentially problematic for other users trying to consume messages.