Rabbitmq : Failed message being reprocessed in an

2019-07-19 05:49发布

问题:

This is the rabbitmq configuration that I have :

<rabbit:admin connection-factory="rmqConnectionFactory"/>

<bean id="**retryAdvice**" class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
    <property name="retryOperations" ref="retryTemplate"/>
</bean>

<bean id="**retryTemplate**" class="org.springframework.retry.support.RetryTemplate">
    <property name="retryPolicy" ref="simpleRetryPolicy"/>
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.FixedBackOffPolicy">
            <property name="backOffPeriod" value="5000"/>
        </bean>
    </property>
    <property name="retryContextCache" ref="retryContext"/>
</bean>

<bean id="**retryContext**" class="org.springframework.retry.policy.MapRetryContextCache"/>

<bean id="**simpleRetryPolicy**" class="org.springframework.retry.policy.SimpleRetryPolicy">
    <property name="maxAttempts" value="3"/>
</bean>

<!-- Spring AMQP Template -->
<bean id="**rabbitTemplate**" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <property name="connectionFactory" ref="rmqConnectionFactory"/>
    <property name="messageConverter" ref="stdJsonMessageConverter"/>
</bean>

<bean id="**stdJsonMessageConverter**" class="org.springframework.amqp.support.converter.JsonMessageConverter">
    <property name="createMessageIds" value="true"/>
</bean>

And my queue is configured as follows :

<rabbit:queue name="${queue}" durable="true">
    <rabbit:queue-arguments>
        <entry key="x-ha-policy" value="all"/>
    </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:direct-exchange name="${exchange}">
    <rabbit:bindings>
        <rabbit:binding queue="${queue}" key="${routingKey}"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

When I published a message on the exchange for the first time, the listener failed with null ID exception. I purged the queue which had the bad message as well. Inspite of that, everytime I start my service, the failed message processing is retried and it fails continuously until there is a RetryCacheCapacityExceeded exception.

Has my failed message been cached somewhere? Is there a way to clear that? Also, why do retries continue even though my retrytemplate suggest 3 reattempts at that interval of 5 secs?

回答1:

When you use stateful retry, the retry state for each message id is kept in a cached (so we know when to stop).

If there's no ID, the message will fail (and keep being delivered) unless you add a MissingMessageIdAdvice to the advice chain (before the retry interceptor), which will allow 1 retry for messages with no id.