I am using the below code to create multiple JMS sessions for multiple consumers to consume messages. My problem is that the code is running in a single threaded fashion. Even if messages are present in the Queue the Second thread is unable to receive anything and just keeps polling. The first thread meanwhile finishes processing the first batch and comes back and consumes the remaining messages. Is there anything wrong with the usage here ?
static {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://172.16.143.99:61616");
connection = connectionFactory.createConnection();
connection.start();
} catch (JMSException e) {
LOGGER.error("Unable to initialise JMS Queue.", e);
}
}
public JMSClientReader(boolean isQueue, String name) throws QueueException {
init(isQueue,name);
}
@Override
public void init(boolean isQueue, String name) throws QueueException
{
// Create a Connection
try {
// Create a Session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
if (isQueue) {
destination = new ActiveMQQueue(name);// session.createQueue("queue");
} else {
destination = new ActiveMQTopic(name);// session.createTopic("topic");
}
consumer = session.createConsumer(destination);
} catch (JMSException e) {
LOGGER.error("Unable to initialise JMS Queue.", e);
throw new QueueException(e);
}
}
public String readQueue() throws QueueException {
// connection.setExceptionListener(this);
// Wait for a message
String text = null;
Message message;
try {
message = consumer.receive(1000);
if(message==null)
return "done";
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
text = textMessage.getText();
LOGGER.info("Received: " + text);
} else {
throw new JMSException("Invalid message found");
}
} catch (JMSException e) {
LOGGER.error("Unable to read message from Queue", e);
throw new QueueException(e);
}
LOGGER.info("Message read is " + text);
return text;
}
your problem is the prefetchPolicy.
all messages was dispatched to the first connected consumer and when another one connects he don't receive messages, so to change this behavior if you have concurrent consumer for a queue you need to set prefetchPolicy to a lower value than default. for example add this
jms.prefetchPolicy.queuePrefetch=1
to the uri config in activemq.xml or set it on the client url like thisTake a look at http://activemq.apache.org/what-is-the-prefetch-limit-for.html
And
http://activemq.apache.org/destination-options.html