SimpleMessageListenerContainer is continuously con

2019-09-11 01:32发布

问题:

I don't want the SimpleMessageListenerContainer to continuously consume the messages when my Transaction marked as rollback.

I didn't add listener-container in my configuration file and trying to rethrow the AmqpRejectAndDontRequeueException. It is not working. Here is my codebase:

@Transactional
 public class MySecondService {

    @Resource
    private MySecondRepository mySecondRepository;

    @Transactional(propagation = Propagation.REQUIRED)
    @ServiceActivator(inputChannel = "my-first-servie-output-channel",
                      outputChannel = "my-Second-servie-output-channel")
    public String saveEntity(final MyTestEntity myTestEntity) 
        throws AmqpRejectAndDontRequeueException {
         try {
            mySecondRepository.save(myTestEntity);
         } catch (Exception e) {
            throw new AmqpRejectAndDontRequeueException("exception");
        }
    }
} 

spring-integration-context.xml

<int:chain input-channel="transaction-inbound-channel" output-channel="my-first-servie-input-channel">
    </int:chain>
<int-amqp:inbound-channel-adapter
            channel="transaction-inbound-channel"
            queue-names="sample.queue"
            concurrent-consumers="5"
            error-channel="failed-channel"
            connection-factory="rabbitConnectionFactory"
            mapped-request-headers="*" 
            transaction-manager="transactionManager" />

    <rabbit:connection-factory
            id="rabbitConnectionFactory"
            connection-factory="rcf"
            host="${spring.rabbitmq.host}"
            port="${spring.rabbitmq.port}"
            username="${spring.rabbitmq.username}"
            password="${spring.rabbitmq.password}"
            />
    <bean id="rcf" class="com.rabbitmq.client.ConnectionFactory">
        <property name="host" value="${spring.rabbitmq.host}"/>
        <property name="requestedHeartbeat" value="10" />
    </bean>

This is the stack trace:

WARN [] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it
org.springframework.transaction.TransactionSystemException: Could not commit JPA transaction; nested exception is javax.persistence.RollbackException: Transaction marked as rollbackOnly
    at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:526) ~[spring-orm-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:757) ~[spring-tx-4.1.9.RELEASE.jar:4.1.9.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:726) ~[spring-tx-4.1.9.RELEASE.jar:4.1.9.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:150) ~[spring-tx-4.1.9.RELEASE.jar:4.1.9.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1062) ~[spring-rabbit-1.5.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:93) ~[spring-rabbit-1.5.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1203) ~[spring-rabbit-1.5.3.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: javax.persistence.RollbackException: Transaction marked as rollbackOnly
    at org.hibernate.jpa.internal.TransactionImpl.commit(TransactionImpl.java:74) ~[hibernate-entitymanager-4.3.11.Final.jar:4.3.11.Final]
    at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:517) ~[spring-orm-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    ... 7 common frames omitted
2016-10-19 00:52:50,673 [SimpleAsyncTaskExecutor-1] INFO [] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Restarting Consumer: tags=[{amq.ctag-QBWGIY7co5vUpXwGDXDDBg=sample.queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://test@999.99.99.999:9999/,2), acknowledgeMode=AUTO local queue size=0 

I don't want this to happen org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Restarting Consumer

Can some one help me to what else I need to add to rethrow the AmqpRejectAndDontRequeueException. Thanks in advance.

回答1:

You don't show the flow.

If you are only invoking a single service, then don't add the transaction manager to the inbound channel adapter.

The transaction will start when the service is invoked; as long as the error flow on failed-channel does not throw an exception (or throws an AmqpRejectAndDontRequeueException) the message won't be requeued.

If you have multiple services that you want to participate in a single transaction, you can use an AOP advice to make transaction-inbound-channel transactional and the downstream flow will all run within that transaction.

Or, if you never want messages requeued, set defaultRequeueRejected to false on the inbound adapter's container (which you'll have to wire up as a <bean/>).