Hi am working on wso2 esb and using Active MQ for message queue.
I have a simple service to place a message in which it call custom java class where it creates a tcp connection and drops a message in queue.
Java code looks like below
package in.esb.custommediators;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.synapse.ManagedLifecycle;
import org.apache.synapse.MessageContext;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.mediators.AbstractMediator;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.transport.nhttp.NhttpConstants;
import org.json.JSONObject;
import org.json.XML;
public class JMSStoreMediator extends AbstractMediator implements
ManagedLifecycle {
Connection connection;
Session session;
public boolean mediate(MessageContext msgCtx) {
log.info("LogLocation = "+getClass().getName()+",ProxyName = "+msgCtx.getProperty("proxy.name")+
",Usercode = "+msgCtx.getProperty("usercode")+",Clientid = "+msgCtx.getProperty("clientid")+
",requestMsgId = "+msgCtx.getProperty("requestMsgId")+",Position = START");
try {
boolean topic=false;
String jmsuri=""+msgCtx.getProperty("jmsuri");
String t=""+msgCtx.getProperty("topic");
if(t.isEmpty()){
topic=false;
}
else {
topic=Boolean.valueOf(t);
}
ConnectionFactory factory= new ActiveMQConnectionFactory(jmsuri);
connection = factory.createConnection();
connection.start();
log.info("LogLocation = "+getClass().getName()+",JMS connection created :"+connection);
this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination=null;
if(!topic)destination= session.createQueue(""+msgCtx.getProperty("jmsqueue"));
else destination= session.createTopic(""+msgCtx.getProperty("jmsqueue"));
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
String xml = ""+msgCtx.getEnvelope().getBody().toStringWithConsume();
if(topic){
JSONObject obj=XML.toJSONObject(xml);
JSONObject ar=obj.getJSONObject("soapenv:Body");
ar.remove("xmlns:soapenv");
xml=ar.toString();
}
TextMessage message = session.createTextMessage(xml);
producer.send(message);
} catch (Exception e) {
log.info("LogLocation = "+getClass().getName()+",Error in storing message in JMS stacktrace is :"+e.toString()+"message is :"+e.getMessage());
e.printStackTrace();
((Axis2MessageContext) msgCtx).setProperty(NhttpConstants.HTTP_SC, 500);
handleException("Error while storing in the message store", msgCtx);
}
finally {
try {
session.close();
if (connection!=null){
log.info("LogLocation = "+getClass().getName()+",JMS connection closing :"+connection);
connection.close();
}
} catch (JMSException e) {
log.info("LogLocation = "+getClass().getName()+",Error in closing JMS connection stacktrace is :"+e.toString());
e.printStackTrace();
}
}
return true;
}
@Override
public void destroy() {
// TODO Auto-generated method stub
}
@Override
public void init(SynapseEnvironment arg0) {
// TODO Auto-generated method stub
}
}
when i call this service to send a message in queue below logs get generated.
[2017-07-29 11:18:35,962] INFO - JMSStoreMediator LogLocation = in.esb.custommediators.JMSStoreMediator,JMS connection created :ActiveMQConnection {id=ID:my-desktop-36442-1501307315570-3:1,clientId=ID:my-desktop-36442-1501307315570-2:1,started=true}
As of now every thing is working good , But when two users try to submit message at the same tire some strange thing happen as shown below
[2017-07-29 11:43:11,948] INFO - JMSStoreMediator LogLocation = in.my.esb.custommediators.JMSStoreMediator,JMS connection created :ActiveMQConnection {id=ID:my-desktop-36442-1501307315570-11:1,clientId=ID:my-desktop-36442-1501307315570-10:1,started=false}
[2017-07-29 11:43:11,963] INFO - JMSStoreMediator LogLocation = in.my.esb.custommediators.JMSStoreMediator,JMS connection created :ActiveMQConnection {id=ID:my-desktop-36442-1501307315570-11:1,clientId=ID:my-desktop-36442-1501307315570-10:1,started=true}
[2017-07-29 11:43:12,068] INFO - JMSStoreMediator LogLocation = in.my.esb.custommediators.JMSStoreMediator,Error in closing JMS connection stacktrace is :org.apache.activemq.ConnectionClosedException: The connection is already closed
Active MQ is creating two connections but using one connection for both the calls and that one connection is getting closed in one of the service call and throwing already closed error in the other service call and the other connection is waiting forever in the connection list of active mq with active status true as shown in the below image and this is also seen in the ESB thread list.
This kind of connections pileup and causing hangs ESB server. Even if i reset this connections from Active MQ ESB threads carry this connection info and only after a restart of ESB the problem get fixed.