I think that org.apache.synapse.transport.passthru.PassThroughHttpSender which is default transport sender for http in wso2esb 4.8.1 (not sure about 4.9.0 will check it later) does not return borrowed thread to worker pool on exception in some cases.
It seems to me that this happens when exception occurs in external sequence (not in proxy which is directly accepts incoming request). For example, it occurs when you implement store-and-forward pattern with store and message processor.
this is my simple WSO2ESB 4.8.1 configuration to reproduce the issue:
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://ws.apache.org/ns/synapse">
<registry provider="org.wso2.carbon.mediation.registry.WSO2Registry">
<parameter name="cachableDuration">15000</parameter>
</registry>
<import name="fileconnector"
package="org.wso2.carbon.connector"
status="enabled"/>
<proxy name="ProxyTest"
transports="http https"
startOnLoad="true"
trace="disable">
<target>
<inSequence>
<property name="FORCE_SC_ACCEPTED"
value="true"
scope="axis2"
type="STRING"/>
<log level="custom">
<property name="text" value="store message"/>
</log>
<store messageStore="TestXMS"/>
<log level="custom">
<property name="text" value="message stored"/>
</log>
</inSequence>
<outSequence/>
<faultSequence/>
</target>
</proxy>
<localEntry key="ESBInstance">Test<description/>
</localEntry>
<endpoint name="HTTPEndpoint">
<http method="post" uri-template="http://localhost/index.php">
<timeout>
<duration>10</duration>
<responseAction>fault</responseAction>
</timeout>
<suspendOnFailure>
<errorCodes>-1</errorCodes>
<initialDuration>0</initialDuration>
<progressionFactor>1.0</progressionFactor>
<maximumDuration>0</maximumDuration>
</suspendOnFailure>
<markForSuspension>
<errorCodes>-1</errorCodes>
</markForSuspension>
</http>
</endpoint>
<sequence name="fault">
<log level="full">
<property name="MESSAGE" value="Executing default 'fault' sequence"/>
<property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/>
<property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/>
</log>
<drop/>
</sequence>
<sequence name="TestSequence">
<log level="full">
<property name="text" value="message recieved"/>
</log>
<call>
<endpoint key="HTTPEndpoint"/>
</call>
<log level="full">
<property name="text" value="message processed"/>
</log>
</sequence>
<sequence name="main">
<in>
<log level="full"/>
<filter source="get-property('To')" regex="http://localhost:9000.*">
<send/>
</filter>
</in>
<out>
<send/>
</out>
<description>The main sequence for the message mediation</description>
</sequence>
<messageStore name="TestXMS"/>
<messageProcessor class="org.apache.synapse.message.processor.impl.sampler.SamplingProcessor"
name="TestMP"
messageStore="TestXMS">
<parameter name="interval">1000</parameter>
<parameter name="sequence">TestSequence</parameter>
<parameter name="concurrency">1</parameter>
<parameter name="is.active">true</parameter>
</messageProcessor>
</definitions>
configure endpoint to send request to nowhere and just call TestProxy 20 time or more in order to exhaust internal pool of worker threads...
after 20th request is received, new requests will be accepted and stored in store but worker pool will be exhausted and messages will not be retried from message store.
I think that there must be source code somewhere available for PassThroughHttpSender... but it was quicker to decompile repository/components/plugins/synapse-nhttp-transport_2.1.2.wso2v4.jar and look inside to see the reason of the problem.
we should look inside sendRequestContent method:
synchronized (msgContext)
{
while ((!Boolean.TRUE.equals(msgContext.getProperty("WAIT_BUILDER_IN_STREAM_COMPLETE"))) && (!Boolean.TRUE.equals(msgContext.getProperty("PASSTHRU_CONNECT_ERROR")))) {
try
{
log.info("msgContext before wait");
msgContext.wait();
log.info("msgContext after wait");
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
this is where the process stack on error. When exception occurs (in another thread) it "forgets" to notify current thread and msgContext waits forever (actually till server restart)
so I slightly modify another class in the same package - DeliveryAgent, method errorConnecting. this method is used to catch a callback from a thread used to connect to a target host... so when we catch a callback we notify msgContext and inform it to continue by adding new synchronize block after targetErrorHandler...
public void errorConnecting(HttpRoute route, int errorCode, String message)
{
Queue<MessageContext> queue = (Queue)this.waitingMessages.get(route);
if (queue != null)
{
MessageContext msgCtx = (MessageContext)queue.poll();
if (msgCtx != null) {
this.targetErrorHandler.handleError(msgCtx, errorCode, "Error connecting to the back end", null, ProtocolState.REQUEST_READY);
synchronized (msgCtx)
{
log.info("errorConnecting: notify message context about error");
msgCtx.setProperty("PASSTHRU_CONNECT_ERROR", Boolean.TRUE);
msgCtx.notifyAll();
}
}
}
else
{
throw new IllegalStateException("Queue cannot be null for: " + route);
}
}
I have done some test and it looks like it fixes the problem of "dead" threads. However, I am not sure if it is a proper fix or not... any suggestions are welcomed..
link to decompiled and modified source files - src.zip