How-to make an arbitrary part of a message flow (p

2019-05-26 08:58发布

I would like to make a flow initiated by syslog inbound-channel-adapter messages transactional. Unfortunately the adapter does not take a poller which could be made transactional (the typical approach used in many examples with other inbound adapters).

Is there any workaround?

EDIT

After some thinking I realized that my intent is a little different than initially described (hence change of the title). Basically all I want to do is some simple and straight-forward way of making an arbitrary part (starting from some arbitrary channel in the flow) of my message flow pseudo-transactional. Meaning - I want to execute some custom code if the flow completes without any exceptions (but note that I don't want my custom pseudo commit code to be a part (step) of a flow itself). And I want to execute some custom code if any exception occured.

Semantics of using TransactionSynchronizationFactory would suite me very well.

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="payload.renameTo('/success/' + payload.name)" channel="committedChannel" />
    <int:after-rollback expression="payload.renameTo('/failed/' + payload.name)" channel="rolledBackChannel" />
</int:transaction-synchronization-factory>

The only problem is how-to wire it together with the rest of the flow. What I tried is to define intermediate dummy service-activator endpoint that receives messages from the channel where I want the transaction to begin. And then add transactional poller to that service-activator. But this approach has problems of its own because in order to use poller you have to define the incoming channel as a queue channel which seem to make execution of the flow in a separate thread (or at least I observed some async behaveour).

1条回答
何必那么认真
2楼-- · 2019-05-26 09:37

Any flow from a message-driven adapter can be run in the scope of a transaction by making the adapter's channel start a transaction:

<tx:advice id="txAdvice" transaction-manager="transactionManager">
    <tx:attributes>
        <tx:method name="send"/>
    </tx:attributes>
</tx:advice>

<aop:config>
    <aop:advisor advice-ref="txAdvice" pointcut="bean(fromSyslog)"/>
</aop:config>

<int:channel id="fromSyslog" />

The channel (and all downstream channels) must be a DirectChannel (no queue channels, or task executors). After an async handoff to another thread, the transaction will commit.

I am sure you understand, but for the benefit of other readers, this does not make the syslog adapter itself transactional, just the downstream flow.

EDIT

Another technique is to use a mid-flow transactional gateway...

@Transactional
public interface TxGate {

    void oneWay(Message<?> message);

}

<int:service-activator input-channel="fromSyslog" ref="txGate" />

<int:gateway id="txGate" service-interface="foo.TxGate"
     default-request-channel="txSyslog" error-channel="foo" />

That way, you can handle an exception within the scope of the transaction and decide whether or not to commit (throwing an exception from the error flow will rollback).

The void return is important; since the downstream flow doesn't return a reply.

EDIT2

In response to your edited question.

So, it seems your issue with the solutions I provided (specifically in the mid-flow transactional gateway) only allows you to take some action if there is an error when you also want to take some (different) action after success.

There are two ways to do that.

  1. Make the last channel in the subflow a publish-subscribe-channel; add a second consumer (use order to explicitly define the order in which they are called) and take your 'success' action on the second subscriber - he won't be called after an exception (by default) so you'd continue to handle the exception case on the gateway's error channel.
  2. Use a ChannelInterceptor. Start the transaction in preSend(). The afterSendCompletion() method will be invoked after the subflow completes. The presence (or not) of the Exception argument is populated if the subflow throws an exception.

We could consider adding such an interceptor to the framework, if you want to consider contributing it.

查看更多
登录 后发表回答