Spring Integration jms message driven channel adap

2019-05-30 23:39发布

问题:

I am using spring integration 4.1.0 to implement message consumption from TIBCO EMS queue using jms-int:message-driven-channel-adapter

I have 9 different message driven channel adapter listening at different server in different queue.

Below is how the message driven channel adapters are defined

<int-jms:message-driven-channel-adapter
        id="mdca1" connection-factory="connectionFactory1"
        channel="jmsChannel1" destination="queueName1"
        error-channel="errorChannel" max-concurrent-consumers="5" auto-startup="true"/>

<int-jms:message-driven-channel-adapter
        id="mdca2" connection-factory="connectionFactory2"
        channel="jmsChannel2" destination="queueName2"
        error-channel="errorChannel" max-concurrent-consumers="5" auto-startup="true"/>

<bean id="connectionFactory1" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="tcf1"/>
    <property name="sessionCacheSize" value="${sessionCacheSize}"/>
    <property name="cacheProducers" value="${cacheProducers}"/>
    <property name="cacheConsumers" value="${cacheConsumers}"/>
</bean>
<bean id="tcf1" class="${connectionFactoryClassName}">
        <property name="serverUrl" value="${serverUrl1}" />
        <property name="userName" value="${username1}" />
        <property name="userPassword" value="${password1}" />
    </bean>

<bean id="connectionFactory2" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="tcf2"/>
    <property name="sessionCacheSize" value="${sessionCacheSize}"/>
    <property name="cacheProducers" value="${cacheProducers}"/>
    <property name="cacheConsumers" value="${cacheConsumers}"/>
</bean>
<bean id="tcf2" class="${connectionFactoryClassName}">
        <property name="serverUrl" value="${serverUrl2}" />
        <property name="userName" value="${username2}" />
        <property name="userPassword" value="${password2}" />
    </bean>

I have my application deployed on Tomcat and everything works fine. All the adapter reads message from the queue and send for processing.

But for some reason, after sometime the messages are not picked up from the queue. Currently I have to restart tomcat to have the adapter working again. I have a button in UI to start/stop adapter but that is not working once the adapter stops picking the message from the queue. Start/Stop buttons work fine too when the message are being picked by the adapter. I can stop and adapter stops picking message and I can start and adapter starts picking message. The issue is when I have my adapter running for sometime and after say 5 - 10 hrs, there is a message in the queue, the adapter is not picking even if it is in running status. The button stops working too then.

Can anyone help what could be the issue? Why the adapters are failing after certain time like 5-10 hrs?

Any help is highly appreciated.

Update: Here is the stacktrace from jstack when the listener fails.

"TIBCO EMS TCPLink Reader (Server-15108908)" daemon prio=10 tid=0x00007f1874115000 nid=0xc8f runnable [0x00007f18b2f1d000]
   java.lang.Thread.State: RUNNABLE
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.read(SocketInputStream.java:152)
    at java.net.SocketInputStream.read(SocketInputStream.java:122)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
    - locked <0x00000006c2d8ea68> (a java.io.BufferedInputStream)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at com.tibco.tibjms.TibjmsxLinkTcp._readWireMsg(TibjmsxLinkTcp.java:625)
    at com.tibco.tibjms.TibjmsxLinkTcp$LinkReader.work(TibjmsxLinkTcp.java:280)
    at com.tibco.tibjms.TibjmsxLinkTcp$LinkReader.run(TibjmsxLinkTcp.java:259)

"org.springframework.jms.listener.DefaultMessageListenerContainer#1-5" prio=10 tid=0x00000000019de000 nid=0xc8e in Object.wait() [0x00007f18b301e000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000006c2eabdf8> (a java.lang.Object)
    at com.tibco.tibjms.TibjmsxSessionImp._getSyncMessage(TibjmsxSessionImp.java:2288)
    at com.tibco.tibjms.TibjmsxSessionImp._receive(TibjmsxSessionImp.java:2122)
    - locked <0x00000006c2eabdf8> (a java.lang.Object)
    at com.tibco.tibjms.TibjmsMessageConsumer._receive(TibjmsMessageConsumer.java:276)
    at com.tibco.tibjms.TibjmsMessageConsumer.receive(TibjmsMessageConsumer.java:481)
    at        org.springframework.jms.connection.CachedMessageConsumer.receive(CachedMessageConsumer.java:82)
    at   org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:413)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:293)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:246)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1142)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1134)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1031)
    at java.lang.Thread.run(Thread.java:745)

"org.springframework.jms.listener.DefaultMessageListenerContainer#3-2" prio=10 tid=0x00007f18ac001000 nid=0xc8d in Object.wait() [0x00007f18b311f000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000006c2e30270> (a java.lang.Object)
    at com.tibco.tibjms.TibjmsxSessionImp._getSyncMessage(TibjmsxSessionImp.java:2288)
    at com.tibco.tibjms.TibjmsxSessionImp._receive(TibjmsxSessionImp.java:2122)
    - locked <0x00000006c2e30270> (a java.lang.Object)
    at com.tibco.tibjms.TibjmsMessageConsumer._receive(TibjmsMessageConsumer.java:276)
    at com.tibco.tibjms.TibjmsMessageConsumer.receive(TibjmsMessageConsumer.java:481)
    at org.springframework.jms.connection.CachedMessageConsumer.receive(CachedMessageConsumer.java:82)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:413)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:293)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:246)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1142)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1134)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1031)
    at java.lang.Thread.run(Thread.java:745)

Any comments?

回答1:

This is generally caused by some component in the downstream flow hanging the container thread uninterruptibly (e.g. reading from a socket with no timeout, where no data arrives).

To diagnose, take a thread dump (e.g. jstack ) when the hang occurs, to find out what the listener container threads are doing.