Active MQ connection issue

2019-09-15 19:19发布

问题:

Hi am working on wso2 esb and using Active MQ for message queue.

I have a simple service to place a message in which it call custom java class where it creates a tcp connection and drops a message in queue.

Java code looks like below

   package in.esb.custommediators;

import javax.jms.*; 

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


import org.apache.synapse.ManagedLifecycle;
import org.apache.synapse.MessageContext; 
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.mediators.AbstractMediator;

import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.transport.nhttp.NhttpConstants;

import org.json.JSONObject;
import org.json.XML;

public class JMSStoreMediator extends AbstractMediator implements
ManagedLifecycle { 

    Connection connection;
    Session session;

    public boolean mediate(MessageContext msgCtx) { 

        log.info("LogLocation = "+getClass().getName()+",ProxyName = "+msgCtx.getProperty("proxy.name")+
                ",Usercode = "+msgCtx.getProperty("usercode")+",Clientid = "+msgCtx.getProperty("clientid")+
                ",requestMsgId = "+msgCtx.getProperty("requestMsgId")+",Position = START"); 


         try {
             boolean topic=false;
            String jmsuri=""+msgCtx.getProperty("jmsuri");
            String t=""+msgCtx.getProperty("topic");
            if(t.isEmpty()){
                topic=false;
            }
            else {
                topic=Boolean.valueOf(t);
            }
            ConnectionFactory factory= new ActiveMQConnectionFactory(jmsuri);
            connection = factory.createConnection();
                connection.start();

            log.info("LogLocation = "+getClass().getName()+",JMS connection created :"+connection);
            this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination=null;
            if(!topic)destination= session.createQueue(""+msgCtx.getProperty("jmsqueue"));
            else destination= session.createTopic(""+msgCtx.getProperty("jmsqueue"));
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);

            String xml = ""+msgCtx.getEnvelope().getBody().toStringWithConsume();

            if(topic){

                JSONObject obj=XML.toJSONObject(xml);
                JSONObject ar=obj.getJSONObject("soapenv:Body");
                ar.remove("xmlns:soapenv");
                xml=ar.toString();
            }
            TextMessage message = session.createTextMessage(xml);
            producer.send(message);


        } catch (Exception e) {

            log.info("LogLocation = "+getClass().getName()+",Error in storing message in JMS stacktrace is :"+e.toString()+"message is :"+e.getMessage());
            e.printStackTrace();

            ((Axis2MessageContext) msgCtx).setProperty(NhttpConstants.HTTP_SC, 500);
            handleException("Error while storing in the message store", msgCtx);

        }
        finally {
            try {
                session.close();
                if (connection!=null){
                    log.info("LogLocation = "+getClass().getName()+",JMS connection closing :"+connection);
                    connection.close();
                }

            } catch (JMSException e) {
                log.info("LogLocation = "+getClass().getName()+",Error in closing JMS connection stacktrace is :"+e.toString());
                e.printStackTrace();
            }
        }

        return true;
    }

    @Override
    public void destroy() {
        // TODO Auto-generated method stub

    }

    @Override
    public void init(SynapseEnvironment arg0) {
        // TODO Auto-generated method stub

    }

}

when i call this service to send a message in queue below logs get generated.

[2017-07-29 11:18:35,962]  INFO - JMSStoreMediator LogLocation = in.esb.custommediators.JMSStoreMediator,JMS connection created :ActiveMQConnection {id=ID:my-desktop-36442-1501307315570-3:1,clientId=ID:my-desktop-36442-1501307315570-2:1,started=true}

As of now every thing is working good , But when two users try to submit message at the same tire some strange thing happen as shown below

[2017-07-29 11:43:11,948]  INFO - JMSStoreMediator LogLocation = in.my.esb.custommediators.JMSStoreMediator,JMS connection created :ActiveMQConnection {id=ID:my-desktop-36442-1501307315570-11:1,clientId=ID:my-desktop-36442-1501307315570-10:1,started=false}
[2017-07-29 11:43:11,963]  INFO - JMSStoreMediator LogLocation = in.my.esb.custommediators.JMSStoreMediator,JMS connection created :ActiveMQConnection {id=ID:my-desktop-36442-1501307315570-11:1,clientId=ID:my-desktop-36442-1501307315570-10:1,started=true}

[2017-07-29 11:43:12,068]  INFO - JMSStoreMediator LogLocation = in.my.esb.custommediators.JMSStoreMediator,Error in closing JMS connection stacktrace is :org.apache.activemq.ConnectionClosedException: The connection is already closed

Active MQ is creating two connections but using one connection for both the calls and that one connection is getting closed in one of the service call and throwing already closed error in the other service call and the other connection is waiting forever in the connection list of active mq with active status true as shown in the below image and this is also seen in the ESB thread list.

This kind of connections pileup and causing hangs ESB server. Even if i reset this connections from Active MQ ESB threads carry this connection info and only after a restart of ESB the problem get fixed.

回答1:

Have you read article Extending the Functionality of WSO2 Enterprise Service Bus - Part 1 ?

Important part is Threading Safety. It states, each mediator, including custom, is shared between incoming messages. I recommend to move class variables

Connection connection;
Session session;

to method public boolean mediate(MessageContext msgCtx) since local variables are thread safe

public class JMSStoreMediator extends AbstractMediator implements
ManagedLifecycle {     

    public boolean mediate(MessageContext msgCtx) { 
             Connection connection;
             Session session;
    ....
    ....
    rest the same