SourcePollingChannelAdapter with Transaction

2020-05-06 11:12发布

I would like to use a SourcePollingChannelAdapter with a transaction propagation REQUIRED when the polling is realized, to rollback all operations if an error is occured. The method setTransactionSynchronizationFactory is not commented... Thanks a lot for your help !

In XML I can do :

<int:poller fixed-rate="5000">
  <int:transactional transaction-manager="transactionManager" propagation="REQUIRED" />
</int:poller>

I would like to use a transaction like this programmatically with a SourcePollingChannelAdapter and a PeriodicTrigger, but I don't know how.

I have this :

SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
adapter.setSource(source);
adapter.setTrigger(new PeriodicTrigger(5, TimeUnit.SECONDS));
adapter.setOutputChannel(channel);
adapter.setBeanFactory(ctx);
adapter.start();

When the bean source is called, an element in database are deleted, a message is created and sent in outputchannel; but if i have an error in the flow after the ouputchannel i would like database restored and element came back ... a simple transaction in fact with propagation. I don't understand how do that.

The ouputchannel is :

<int:channel id="channel" >
    <int:queue />
</int:channel>
<int-http:outbound-gateway request-channel="channel"
    url="http://localhost:8081/icopitole-ws/baseactive" http-method="GET"
    reply-channel="reresponseVersionChannel" expected-response-type="java.lang.String"  />

When the URL doesn't respond, an exception is thrown but no Rollback is executed, although I have add a DefaultTransactionSynchronizationFactory and TransactionInterceptor like you said :(

1条回答
祖国的老花朵
2楼-- · 2020-05-06 12:00

If I understand you correctly you need to use this one: DefaultTransactionSynchronizationFactory

And here is a snapshot how to configure it:

SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
    ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor =
            new ExpressionEvaluatingTransactionSynchronizationProcessor();
    syncProcessor.setBeanFactory(mock(BeanFactory.class));
    PollableChannel queueChannel = new QueueChannel();
    syncProcessor.setBeforeCommitExpression(new SpelExpressionParser().parseExpression("#bix"));
    syncProcessor.setBeforeCommitChannel(queueChannel);
    syncProcessor.setAfterCommitChannel(queueChannel);
    syncProcessor.setAfterCommitExpression(new SpelExpressionParser().parseExpression("#baz"));

    DefaultTransactionSynchronizationFactory syncFactory =
            new DefaultTransactionSynchronizationFactory(syncProcessor);

    adapter.setTransactionSynchronizationFactory(syncFactory);

Transaction boundaries are covered in the SourcePollingChannelAdapter#adviceChain, so it should be configure like this:

    TransactionInterceptor txAdvice = 
    new TransactionInterceptor(transactionManager, 
         new MatchAlwaysTransactionAttributeSource(new DefaultTransactionAttribute()));
    adapter.setAdviceChain(Collections.singletonList(txAdvice));

So, now each 'poll' will be wrapped with transaction and your syncFactory will do the stuff.

查看更多
登录 后发表回答