I am using spring-rabbit-1.7.3.RELEASE.jar
I have defined a SimpleMessageListenerContainer in my xml with shutdownTimeout parameter.
bean id="aContainer"
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="rabbitConnectionFactory" />
<property name="queueNames" value="aQueue" />
<property name="adviceChain" ref="retryAdvice" />
<property name="acknowledgeMode" value="AUTO" />
<property name="shutdownTimeout" value="900000" />
</bean>
When my service shuts down and there are still messages in "aQueue", I expect that the shutdownTimeout would allow the messages to get processed. But this doesn't seem to happen.
On further investigation I found out that the await() method defined in SimpleMessageListenerContainer is always returning true.
this.cancellationLock.await(Long.valueOf(this.shutdownTimeout), TimeUnit.MILLISECONDS);
I would like to understand how the logic for await works, how it acquires lock and what additional configuration is required at my end to make the code work.
It waits for the consumers on-the-fly, those who are busy to process already fetched but not yet acknowledged messages. Nobody is going to poll fresh messages from the queue during shutdown.
The
ActiveObjectCounter
awaits on the all internalCountDownLatch
s to be released. And that happens when we:So, that really might be a fact that all your consumers (
private volatile int concurrentConsumers = 1;
by default) are cancelled and released during thatshutdownTimeout
.But again: no body is going to poll new messages from the Broker when the state is
shutdown
.