我们将有一个蒙戈集合包含多个工作单元。 我的想法是,该文件将有一个状态字段有四个选项:未加工,处理,DONE,未果。 Spring集成将配置从该数据库读取和处理存储在那里的消息。
入站蒙戈DSL流将基于未加工的收藏价值阅读:
MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{'status' : 'UNPROCESSED'}"));
return IntegrationFlows.from(messageSource)...
这里的问题:如果我有几个工人机器来自同一数据库中读取我想,以防止他们在给我的轮询未处理的数据相同的行操作采用了保守值maxMessagesPerPoll
或消息处理需要一段时间。
这似乎是正确的场地是用TransactionSynchronizationFactory
定义ProcessBeforeCommit阶段更新状态加工和ProcessAfterCommit阶段更新状态DONE还是失败。 但是,添加此机制尚不清楚,我看的轮询器和事务管理器的API。 有例子,XML,但没有,我可以看到使用DSL。
我也想确保ProcessBeforeCommit发生在数据库的时间阅读,而不是后处理......不是吗? 另外,如果这不是一个工程师的解决方案,从收集蒙戈读取请随时提出一个更好的体系结构的最佳方式。
不, ProcessBeforeCommit
和ProcessAfterCommit
非常接近回调。 他们在过程的结尾处明确发生。 让我们考虑你有这样的方法:
@Transactional
void foo() {}
当你调用该方法,交易进入方法体之前开始。 当我们执行后退出的方法体,进行beforeCommit回调。 它可能会失败,因为在我们的过程中的外部连接(DB?)可能会丢失。 且仅当它是确定的,我们进行了afterCommit
。
你问可以通过完成AbstractMessageSourceAdvice
实现: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-channels-section.html#conditional-pollers 。 所以,在afterReceive()
的实现,你可以更新您的文档的PROCESSING
,甚至决定返回null
的消息,而不是:仅仅因为它在DB状态已经是PROCESSING
。 这样的Advice
可以注入PollerSpec
:
/**
* Specify AOP {@link Advice}s for the {@code pollingTask}.
* @param advice the {@link Advice}s to use.
* @return the spec.
*/
public PollerSpec advice(Advice... advice) {
该DONE
和FAILED
真的可以实现通过TransactionSynchronizationFactoryBean
应用于PollerSpec
:
/**
* Specify the {@link TransactionSynchronizationFactory} to attach a
* {@link org.springframework.transaction.support.TransactionSynchronization}
* to the transaction around {@code poll} operation.
* @param transactionSynchronizationFactory the TransactionSynchronizationFactory to use.
* @return the spec.
*/
public PollerSpec transactionSynchronizationFactory(