I want to receive chunk of messages from Queue within some timeLimit(Ex : 300 millisec after receiving the 1st message) using DefaultMessageListenerConatiner Of Spring (By overriding doReceiveAndExecute) as mentioned in the link.
I can group the messages of my batch size i.e 20 when the queue is having too many messages and I can receive less than 20 messages when there are very less messages in Queue.
Issue :
I see it takes too much time(sometimes 1 sec and sometime 2 secs and more) for sending the messages to Listener even when the queue is full.
When I try with DefaultMessageListenerConatiner as such to receive single messages concurrently, I see the messages are received in a delay of few milliseconds(like 1 millisec or max 30 to 60 millisec)
I didn't specify transactionTimeout or receiveTimeout and I didn't link any transactionManager as well.
Can Springers please help me to find where the timeOut can be specified or How can I redeuce the time delay?
BatchMessageListenerContainer :
package com.mypackage;
import javax.jms.Session;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import java.util.ArrayList;
import java.util.List;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.connection.ConnectionFactoryUtils;
import org.springframework.jms.support.JmsUtils;
import org.springframework.transaction.TransactionStatus;
/**
* Listener Container that allows batch consumption of messages. Works only with transacted sessions
*/
public class BatchMessageListenerContainer extends DefaultMessageListenerContainer {
public static final int DEFAULT_BATCH_SIZE = 20;
private int batchSize = DEFAULT_BATCH_SIZE;
public BatchMessageListenerContainer() {
super();
setSessionTransacted(true);
}
/**
* @return The batch size on this container
*/
public int getBatchSize() {
return batchSize;
}
/**
* @param batchSize The batchSize of this container
*/
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
/**
* The doReceiveAndExecute() method has to be overriden to support multiple-message receives.
*/
@Override
protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer,
TransactionStatus status) throws JMSException {
Connection conToClose = null;
MessageConsumer consumerToClose = null;
Session sessionToClose = null;
try {
Session sessionToUse = session;
MessageConsumer consumerToUse = consumer;
if (sessionToUse == null) {
Connection conToUse = null;
if (sharedConnectionEnabled()) {
conToUse = getSharedConnection();
}
else {
conToUse = createConnection();
conToClose = conToUse;
conToUse.start();
}
sessionToUse = createSession(conToUse);
sessionToClose = sessionToUse;
}
if (consumerToUse == null) {
consumerToUse = createListenerConsumer(sessionToUse);
consumerToClose = consumerToUse;
}
List<Message> messages = new ArrayList<Message>();
int count = 0;
Message message = null;
// Attempt to receive messages with the consumer
do {
message = receiveMessage(consumerToUse);
if (message != null) {
messages.add(message);
}
}
// Exit loop if no message was received in the time out specified, or
// if the max batch size was met
while ((message != null) && (++count < batchSize));
if (messages.size() > 0) {
// Only if messages were collected, notify the listener to consume the same.
try {
doExecuteListener(sessionToUse, messages);
sessionToUse.commit();
}
catch (Throwable ex) {
handleListenerException(ex);
if (ex instanceof JMSException) {
throw (JMSException) ex;
}
}
return true;
}
// No message was received for the period of the timeout, return false.
noMessageReceived(invoker, sessionToUse);
return false;
}
finally {
JmsUtils.closeMessageConsumer(consumerToClose);
JmsUtils.closeSession(sessionToClose);
ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
}
}
protected void doExecuteListener(Session session, List<Message> messages) throws JMSException {
if (!isAcceptMessagesWhileStopping() && !isRunning()) {
if (logger.isWarnEnabled()) {
logger.warn("Rejecting received messages because of the listener container "
+ "having been stopped in the meantime: " + messages);
}
rollbackIfNecessary(session);
throw new JMSException("Rejecting received messages as listener container is stopping");
}
@SuppressWarnings("unchecked")
SessionAwareBatchMessageListener<Message> lsnr = (SessionAwareBatchMessageListener<Message>) getMessageListener();
try {
lsnr.onMessages(session, messages);
}
catch (JMSException ex) {
rollbackOnExceptionIfNecessary(session, ex);
throw ex;
}
catch (RuntimeException ex) {
rollbackOnExceptionIfNecessary(session, ex);
throw ex;
}
catch (Error err) {
rollbackOnExceptionIfNecessary(session, err);
throw err;
}
}
@Override
protected void checkMessageListener(Object messageListener) {
if (!(messageListener instanceof SessionAwareBatchMessageListener<?>)) {
throw new IllegalArgumentException("Message listener needs to be of type ["
+ SessionAwareBatchMessageListener.class.getName() + "]");
}
}
@Override
protected void validateConfiguration() {
if (batchSize <= 0) {
throw new IllegalArgumentException("Property batchSize must be a value greater than 0");
}
}
public void setSessionTransacted(boolean transacted) {
if (!transacted) {
throw new IllegalArgumentException("Batch Listener requires a transacted Session");
}
super.setSessionTransacted(transacted);
}
}
SessionAwareBatchMessageListener:
package com.mypackage;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
public interface SessionAwareBatchMessageListener<M extends Message> {
/**
* Perform a batch action with the provided list of {@code messages}.
*
* @param session JMS {@code Session} that received the messages
* @param messages List of messages
* @throws JMSException JMSException thrown if there is an error performing the operation.
*/
public void onMessages(Session session, List<M> messages) throws JMSException;
}
Bean in applicationContext.xml:
<bean id="myMessageListener" class="org.mypackage.MyMessageListener">
<bean id="jmsContainer" class="com.mypackage.BatchMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" ref="queue"/>
<property name="messageListener" ref="myMessageListener"/>
<property name ="concurrentConsumers" value ="10"/>
<property name ="maxConcurrentConsumers" value ="50"/>
</bean>
MyMessageListner :
package org.mypackage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.mypackage.service.MyService;
public class MyMessageListener implements SessionAwareBatchMessageListener<TextMessage> {
@Autowired
private MyService myService;
@Override
public void onMessage(Session session, List<TextMessage> messages) {
try {
for(TextMessage tm :messages) {
TextMessage textMessage = (TextMessage) message;
// parse the message and add to list
}
//process list of Objects to DB
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}