Spring integration deadlock using Aggregator + Mes

2019-07-15 22:17发布

问题:

This question is related to this post in SI forum, but as the forum is closed, I post it here to continue the thread:

http://forum.spring.io/forum/spring-projects/integration/748192-messages-not-flowing-when-using-jms-channels

To sum up, I have an aggregator with a Redis message store and a reaper scheduled every 60 secs. Messages are sent to the aggregator using a JMS-Channel. Here's the config:

<bean id="jedisPoolConfigBean" class="redis.clients.jedis.JedisPoolConfig">
    <property name="maxActive" value="10" />
    <property name="maxIdle" value="5" />
    <property name="minIdle" value="1" />
    <property name="testOnBorrow" value="true" />
    <property name="testOnReturn" value="true" />
    <property name="testWhileIdle" value="true" />
</bean>

<bean id="loyaltyRedisConnectionFactory"
    class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
    <property name="hostName" value="${redis.hostName}" />
    <property name="database" value="${redis.loyalty.database}" />
    <property name="port" value="${redis.port}" />
    <property name="poolConfig" ref="jedisPoolConfigBean" />
</bean>

<bean id="loyaltyAggregatorRedisMessageStore"
    class="org.springframework.integration.redis.store.RedisMessageStore">
    <constructor-arg ref="loyaltyRedisConnectionFactory" />
</bean>

<task:scheduler id="loyaltyScheduler" 
      pool-size="${loyalty.aggregator.reaper.pool_size}"/>

<task:scheduled-tasks scheduler="loyaltyScheduler">
    <task:scheduled ref="loyaltyReaperBean" method="run" fixed-rate="60000" />
</task:scheduled-tasks>

<bean id="loyaltyReaperBean" class="org.springframework.integration.store.MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="loyaltyAggregatorRedisMessageStore" />
    <property name="timeout" value="30000" />
</bean>

I'm a bit new with thread dumps but as far I can see DefaultMessageListener threads are stucked by the taskScheduler that launches the MessageReaper. In particular, at the ReentrantLock class.

Any idea? Maybe I have to make some other config to avoid this

Any help is appreciated

Thanks in advance! Guzman

"loyaltyScheduler-1" - Thread t@57
java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <47e54701> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

  Locked ownable synchronizers:
  - locked <368788cf> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)


"DefaultMessageListenerContainer-6" - Thread t@73
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- waiting to lock <368788cf> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) owned by "loyaltyScheduler-1" t@57
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:894)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1221)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:340)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:223)

"DefaultMessageListenerContainer-7" - Thread t@80
 java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- waiting to lock <368788cf> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) owned by "loyaltyScheduler-1" t@57
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:894)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1221)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:340)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:223)

"DefaultMessageListenerContainer-8" - Thread t@83
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- waiting to lock <368788cf> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) owned by "loyaltyScheduler-1" t@57
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:894)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1221)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:340)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:223)