Spring Integration JMS with JTA rollback when mess

2019-07-28 11:30发布

问题:

I am using Spring Integration with JTA support through Atomikos and JMS bound to different Webshpere MQ both inbound and outbound. The flow is the following:

  • JMS inbound channel adapter receive the message
  • some transformations
  • JMS outbound channel adapter to the output queue
  • when an error occurs, the errorChannel receives the message
  • exception type router routes unhandled errors to a custom rethrow service and handled ones to a recipient-list-router that sends them to 2 error queues

My problem is that I want the transaction committed even if the message goes downstream the errorChannel (on the handled exception case and to the error queues). As I understand, the rollback have to occur only if the Exception is rethrown (that's why I rethrow the unhandled ones), but in my case, the transaction rollbacks as soon as the message reaches the errorChannel (before it is routed elsewhere).

What am I doing wrong?

Configuration follows.

<jms:inbound-channel-adapter id="jms-in"
                             destination="input-queue"
                             connection-factory="inConnectionFactory"
                             channel="edi-inbound"
                             acknowledge="transacted">
    <poller max-messages-per-poll="${process.jms.inbound.poll.messages-per-poll:1}"
            fixed-rate="${process.jms.inbound.poll.rate-millis:60000}"
            >
        <transactional timeout="${process.tx.timeout-sec:60}"/>
    </poller>
</jms:inbound-channel-adapter>
<channel id="edi-inbound"/>

<chain input-channel="edi-inbound" output-channel="edi-transformation-chain">
    <object-to-string-transformer/>
    <service-activator ref="inbound" method="service"/>
</chain>

<!-- edifact transformation flow -->
<chain input-channel="edi-transformation-chain" output-channel="outbound-message-compose">
    <transformer ref="edi2xml-converter"/>
    <transformer ref="xml-mapper"/>
</chain>




<chain input-channel="outbound-message-compose" output-channel="outbound-channel">
    <service-activator ref="outbound-message-composer" />
</chain>

<channel id="outbound-channel">
    <interceptors>
        <beans:ref bean="outbound-interceptor" />
    </interceptors>
</channel>

<recipient-list-router input-channel="outbound-channel">
    <recipient channel="file-outbound"/>
    <recipient channel="queue-outbound"/>
</recipient-list-router>



<channel id="queue-outbound"/>
<jms:outbound-channel-adapter id="jms-out" destination="output-queue" channel="queue-outbound" connection-factory="outConnectionFactory"/>



<channel id="file-outbound"/>
<file:outbound-channel-adapter id="file-outbound"
                                   directory="${output.directory}"
                                   filename-generator-expression="headers['${application.header.key.messageid}'] + '_' + new java.util.Date().getTime() + '.xml'"
                                   delete-source-files="true" />




<!-- notification outbound flow -->
<channel id="errorChannel">
    <interceptors>
        <wire-tap channel="logger"/>
    </interceptors>
</channel>
<logging-channel-adapter id="logger" level="INFO"/>

<exception-type-router input-channel="errorChannel" default-output-channel="unhandled-error-channel">
    <mapping exception-type="aero.aice.apidcm.integration.exception.HandledException" channel="error-notification-channel" />
</exception-type-router>

<recipient-list-router input-channel="error-notification-channel">
    <recipient channel="queue-outbound-error"/>
    <recipient channel="queue-inbound-error"/>
</recipient-list-router>

<chain input-channel="queue-outbound-error">
    <service-activator ref="outbound-error-composer" />
    <jms:outbound-channel-adapter id="jms-out-error"
                                  destination="error-output-queue"
                                  connection-factory="outConnectionFactory"
                                  session-transacted="true"/>
</chain>

<chain input-channel="queue-inbound-error">
    <service-activator ref="error-notif-composer" />
    <jms:outbound-channel-adapter id="jms-in-error"
                                  destination="error-input-queue"
                                  connection-factory="outConnectionFactory"
                                  session-transacted="true"/>
</chain>


<channel id="unhandled-error-channel" />
<service-activator ref="exception-rethrow" input-channel="unhandled-error-channel"/>

Just to be complete, when the tx rollbacks on the error channel, both error queues receives the message in any case (as if the outbound adapters would not participate in the transaction), and the tx for the normal flow (when no error occurs) works perfectly.

回答1:

That's correct.

Because you use Polling Inbound Channel Adapter. It's logic is like:

AbstractPollingEndpoint.this.taskExecutor.execute(() -> {
  ...
                    if (!Poller.this.pollingTask.call()) {
                        break;
                    }
   ...
                catch (Exception e) {
                    if (e instanceof RuntimeException) {
                        throw (RuntimeException) e;
                    }
                    else {
                        throw new MessageHandlingException(new ErrorMessage(e), e);
                    }
                }
            }
        });

Where you TX is a part of pollingTask proxy, as an AOP TransactionInterceptor.

The errorChannel is a part of this.taskExecutor's ErrorHandler. Therefore we can reach errorChannel only when we throw an exception from the pollingTask. Since we have TX there, it is ralled back, of course.

My point is: the error handling process in the Polling Inbound Channel Adapter is done outside of TX.

Consider to switch to the <int-jms:message-driven-channel-adapter>.