ActiveMQ's Messages being redelivered (PooledC

2019-05-10 21:28发布

问题:

I have an application which is using spring MessageListenerContainers along with ActiveMQ. I've configured it to use PooledConnectionFactory, following activemq's documentation.

My scenario is as following:

  • Two java-app consumers (2 different tomcats) from a queue
  • Master slave activemq brokers, with failover url
  • PooledConnectionFactory configured
  • Camel on the app
  • activemq 5.8.0, with commons-pool 1.6

Everything seems to work ok, but once in a while the messages are being redelivered to the consumers, which causes an application error.

Following the logs I suspect this error is being caused by some misconfiguraiton in spring´ s PooledConnectionFactory bean definitions which is closing even used connections which then forces a rollback on the current session and consequently, the message to be redelivered

The spring config of the pool is as following:

<bean id="AMQconnFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
            <value>${activemq_url}?initialReconnectDelay=100&amp;timeout=3000&amp;jms.prefetchPolicy.all=1&amp;jms.redeliveryPolicy.initialRedeliveryDelay=300000
            </value>
        </property>
    </bean>
    <bean id="sharedConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" abstract="true">
        <property name="connectionFactory" ref="AMQconnFactory" />
        <property name="maxConnections" value="30" />
    </bean>

    <bean id="MotionJMSConnectionFactory" parent="sharedConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
        init-method="start" destroy-method="stop" />

<bean id="sharedListenersProps" abstract="true" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="maxConcurrentConsumers" value="1" />
        <property name="concurrentConsumers" value="1" />
        <property name="connectionFactory" ref="MotionJMSConnectionFactory" />
        <property name="sessionTransacted" value="true" />
        <property name="transactionTimeout" value="300"/>
    </bean>

The logs which pointed me to that direction are the following:

[TRACE] (ActiveMQMessageConsumer.java:494) (09:48:16,412) - ID:bbiovx01.bglobal.bcorp-56155-1365791814916-1:3222:1:1 received message: MessageDispatch {commandId = 0, responseRequired = false, consumerId = 
ID:bbiovx01.bglobal.bcorp-56155-1365791814916-1:3222:1:1, destination = queue://EntradaQueue, message = ActiveMQObjectMessage {commandId = 12999, responseRequired = true, messageId = ID:bbiovx01.bglobal.bco
rp-33973-1365792268762-1:28:2:1:12, originalDestination = null, originalTransactionId = null, producerId = ID:bbiovx01.bglobal.bcorp-33973-1365792268762-1:28:2:1, destination = queue://EntradaQueue, transac
tionId = null, expiration = 0, timestamp = 1365857296363, arrival = 0, brokerInTime = 1365857296364, brokerOutTime = 1365857296410, correlationId = null, replyTo = null, persistent = true, type = null, prio
rity = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@31efa8c3, marshalledProperties = null, dataStructure 
= null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false}, redeliveryCounter = 0}
[DEBUG] (AbstractPollingMessageListenerContainer.java:313) (09:48:16,412) - Received message of type [class org.apache.activemq.command.ActiveMQObjectMessage] from consumer [PooledMessageConsumer { ActiveMQ
MessageConsumer { value=ID:bbiovx01.bglobal.bcorp-56155-1365791814916-1:3222:1:1, started=true } }] of session [PooledSession { ActiveMQSession {id=ID:bbiovx01.bglobal.bcorp-56155-1365791814916-1:3222:1,sta
rted=true} }]
<!-- app LOG -->
[DEBUG] (MDPEntrada.java:53) (09:48:16,413) - Mensagem de entrada recebida ActiveMQObjectMessage {commandId = 12999, responseRequired = true, messageId = ID:bbiovx01.bglobal.bcorp-33973-1365792268762-1:28:2:1:12, originalDestination = null, originalTransactionId = null, producerId = ID:bbiovx01.bglobal.bcorp-33973-1365792268762-1:28:2:1, destination = queue://EntradaQueue, transactionId = null, expiration = 0, timestamp = 1365857296363, arrival = 0, brokerInTime = 1365857296364, brokerOutTime = 1365857296410, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@31efa8c3, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false}
TRACE] (PooledConnectionFactory.java:121) (09:48:17,295) - Destroying connection: ConnectionPool[null]
[TRACE] (PooledConnectionFactory.java:154) (09:48:17,295) - Connection has expired: ConnectionPool[null] and will be destroyed
[TRACE] (PooledConnectionFactory.java:121) (09:48:17,295) - Destroying connection: ConnectionPool[null]
[TRACE] (PooledConnectionFactory.java:154) (09:48:17,296) - Connection has expired: ConnectionPool[ActiveMQConnection {id=ID:bbiovx01.bglobal.bcorp-56155-1365791814916-1:3222,clientId=ID:bbiovx01.bglobal.bcorp-56155-1365791814916-0:3218,started=true}] and will be destroyed
[TRACE] (PooledConnectionFactory.java:121) (09:48:17,296) - Destroying connection: ConnectionPool[ActiveMQConnection {id=ID:bbiovx01.bglobal.bcorp-56155-1365791814916-1:3222,clientId=ID:bbiovx01.bglobal.bcorp-56155-1365791814916-0:3218,started=true}]
[TRACE] (PooledConnectionFactory.java:154) (09:48:17,297) - Connection has expired: ConnectionPool[ActiveMQConnection {id=ID:bbiovx01.bglobal.bcorp-56155-1365791814916-1:3221,clientId=ID:bbiovx01.bglobal.bcorp-56155-1365791814916-0:3217,started=true}] and will be destroyed
[TRACE] (PooledConnectionFactory.java:121) (09:48:17,297) - Destroying connection: ConnectionPool[ActiveMQConnection {id=ID:bbiovx01.bglobal.bcorp-56155-1365791814916-1:3221,clientId=ID:bbiovx01.bglobal.bcorp-56155-1365791814916-0:3217,started=true}]
[WARN ] (PooledSession.java:123) (09:48:17,297) - Caught exception trying rollback() when putting session back into the pool, will invalidate. javax.jms.IllegalStateException: The Session is closed
javax.jms.IllegalStateException: The Session is closed
        at org.apache.activemq.ActiveMQSession.checkClosed(ActiveMQSession.java:731)
        at org.apache.activemq.ActiveMQSession.rollback(ActiveMQSession.java:574)
        at org.apache.activemq.pool.PooledSession.close(PooledSession.java:120)
        at org.springframework.jms.support.JmsUtils.closeSession(JmsUtils.java:108)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.clearResources(DefaultMessageListenerContainer.java:1099)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:958)
        at java.lang.Thread.run(Thread.java:662)
[WARN ] (DefaultMessageListenerContainer.java:818) (09:48:17,297) - Setup of JMS message listener invoker failed for destination 'topic://AtualizaAssinaturasETemplatesTopic' - trying to recover. Cause: The Session is closed
javax.jms.IllegalStateException: The Session is closed
        at org.apache.activemq.ActiveMQSession.checkClosed(ActiveMQSession.java:731)
        at org.apache.activemq.ActiveMQSession.getTransacted(ActiveMQSession.java:521)
        at org.apache.activemq.pool.PooledSession.getTransacted(PooledSession.java:259)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.commitIfNecessary(AbstractMessageListenerContainer.java:571)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:358)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1059)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
        at java.lang.Thread.run(Thread.java:662)
[TRACE] (PooledConnectionFactory.java:154) (09:48:17,302) - Connection has expired: ConnectionPool[null] and will be destroyed
[TRACE] (PooledConnectionFactory.java:121) (09:48:17,302) - Destroying connection: ConnectionPool[null]
[TRACE] (PooledConnectionFactory.java:154) (09:48:17,302) - Connection has expired: ConnectionPool[null] and will be destroyed
[TRACE] (PooledConnectionFactory.java:121) (09:48:17,303) - Destroying connection: ConnectionPool[null]
[TRACE] (PooledConnectionFactory.java:140) (09:48:17,303) - Created new connection: ConnectionPool[ActiveMQConnection {id=ID:bbiovx01.bglobal.bcorp-56155-1365791814916-1:3320,clientId=null,started=false}]
[INFO ] (DefaultMessageListenerContainer.java:862) (09:48:17,306) - Successfully refreshed JMS Connection
[INFO ] (SEntradaImpl.java:368) (09:48:17,311) - Tempo de Processamento da requisicao: 843ms
[INFO ] (SEntradaImpl.java:227) (09:48:17,311) - * FINALIZANDO ENTRADA *
[WARN ] (AbstractMessageListenerContainer.java:694) (09:48:17,312) - Execution of JMS message listener failed, and no ErrorHandler has been set.
javax.jms.IllegalStateException: The Session is closed
        at org.apache.activemq.ActiveMQSession.checkClosed(ActiveMQSession.java:731)
        at org.apache.activemq.ActiveMQSession.getTransacted(ActiveMQSession.java:521)
        at org.apache.activemq.pool.PooledSession.getTransacted(PooledSession.java:259)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.commitIfNecessary(AbstractMessageListenerContainer.java:571)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:481)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1059)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
        at java.lang.Thread.run(Thread.java:662)
[WARN ] (PooledSession.java:123) (09:48:17,312) - Caught exception trying rollback() when putting session back into the pool, will invalidate. javax.jms.IllegalStateException: The Session is closed
javax.jms.IllegalStateException: The Session is closed
        at org.apache.activemq.ActiveMQSession.checkClosed(ActiveMQSession.java:731)
        at org.apache.activemq.ActiveMQSession.rollback(ActiveMQSession.java:574)
        at org.apache.activemq.pool.PooledSession.close(PooledSession.java:120)
        at org.springframework.jms.support.JmsUtils.closeSession(JmsUtils.java:108)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.clearResources(DefaultMessageListenerContainer.java:1099)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:958)
        at java.lang.Thread.run(Thread.java:662)
[WARN ] (DefaultMessageListenerContainer.java:818) (09:48:17,312) - Setup of JMS message listener invoker failed for destination 'queue://EntradaQueue' - trying to recover. Cause: The Session is closed
javax.jms.IllegalStateException: The Session is closed
        at org.apache.activemq.ActiveMQSession.checkClosed(ActiveMQSession.java:731)
        at org.apache.activemq.ActiveMQSession.getTransacted(ActiveMQSession.java:521)
        at org.apache.activemq.pool.PooledSession.getTransacted(PooledSession.java:259)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.commitIfNecessary(AbstractMessageListenerContainer.java:571)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:481)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1059)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
        at java.lang.Thread.run(Thread.java:662)
[TRACE] (PooledConnectionFactory.java:140) (09:48:17,312) - Created new connection: ConnectionPool[ActiveMQConnection {id=ID:bbiovx01.bglobal.bcorp-56155-1365791814916-1:3322,clientId=null,started=false}]
[INFO ] (DefaultMessageListenerContainer.java:862) (09:48:17,316) - Successfully refreshed JMS Connection

I can provide some more details if needed.

best regards.

EDIT:

also, i've tried reducing the pool size to 1, and now, i'm getting more often the following stack (WARN):

[WARN ] (PooledSession.java:123) (02:00:00,160) - Caught exception trying rollback() when putting session back into the pool, will invalidate. javax.jms.IllegalStateException: The Session is closed
javax.jms.IllegalStateException: The Session is closed
    at org.apache.activemq.ActiveMQSession.checkClosed(ActiveMQSession.java:731)
    at org.apache.activemq.ActiveMQSession.rollback(ActiveMQSession.java:574)
    at org.apache.activemq.pool.PooledSession.close(PooledSession.java:120)
    at org.springframework.jms.support.JmsUtils.closeSession(JmsUtils.java:108)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.clearResources(DefaultMessageListenerContainer.java:1099)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:958)
    at java.lang.Thread.run(Thread.java:662)
[WARN ] (PooledSession.java:123) (02:00:00,160) - Caught exception trying rollback() when putting session back into the pool, will invalidate. javax.jms.IllegalStateException: The Session is closed
javax.jms.IllegalStateException: The Session is closed
    at org.apache.activemq.ActiveMQSession.checkClosed(ActiveMQSession.java:731)
    at org.apache.activemq.ActiveMQSession.rollback(ActiveMQSession.java:574)
    at org.apache.activemq.pool.PooledSession.close(PooledSession.java:120)
    at org.springframework.jms.support.JmsUtils.closeSession(JmsUtils.java:108)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.clearResources(DefaultMessageListenerContainer.java:1099)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:958)
    at java.lang.Thread.run(Thread.java:662)

回答1:

This is caused by a bug in the PooledConnection code, and will be fixed in version 5.9.0. A workaround is to set the idle timeout on PooledConnectionFactory to 0.

PooledConnectionFactory.setIdleTimeout(0)


回答2:

Add a property

<property name="idleTimeout" value="0"/>

to your PooledConnectionFactory