实现一个MongoDB的入站流量与Spring集成(Implementing a MongoDB I

2019-09-26 08:42发布

我们将有一个蒙戈集合包含多个工作单元。 我的想法是,该文件将有一个状态字段有四个选项:未加工,处理,DONE,未果。 Spring集成将配置从该数据库读取和处理存储在那里的消息。

入站蒙戈DSL流将基于未加工的收藏价值阅读:

MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{'status' : 'UNPROCESSED'}"));
return IntegrationFlows.from(messageSource)...

这里的问题:如果我有几个工人机器来自同一数据库中读取我想,以防止他们在给我的轮询未处理的数据相同的行操作采用了保守值maxMessagesPerPoll或消息处理需要一段时间。

这似乎是正确的场地是用TransactionSynchronizationFactory定义ProcessBeforeCommit阶段更新状态加工和ProcessAfterCommit阶段更新状态DONE还是失败。 但是,添加此机制尚不清楚,我看的轮询器和事务管理器的API。 有例子,XML,但没有,我可以看到使用DSL。

我也想确保ProcessBeforeCommit发生在数据库的时间阅读,而不是后处理......不是吗? 另外,如果这不是一个工程师的解决方案,从收集蒙戈读取请随时提出一个更好的体系结构的最佳方式。

Answer 1:

不, ProcessBeforeCommitProcessAfterCommit非常接近回调。 他们在过程的结尾处明确发生。 让我们考虑你有这样的方法:

@Transactional
void foo() {}

当你调用该方法,交易进入方法体之前开始。 当我们执行后退出的方法体,进行beforeCommit回调。 它可能会失败,因为在我们的过程中的外部连接(DB?)可能会丢失。 且仅当它是确定的,我们进行了afterCommit

你问可以通过完成AbstractMessageSourceAdvice实现: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-channels-section.html#conditional-pollers 。 所以,在afterReceive()的实现,你可以更新您的文档的PROCESSING ,甚至决定返回null的消息,而不是:仅仅因为它在DB状态已经是PROCESSING 。 这样的Advice可以注入PollerSpec

/**
 * Specify AOP {@link Advice}s for the {@code pollingTask}.
 * @param advice the {@link Advice}s to use.
 * @return the spec.
 */
public PollerSpec advice(Advice... advice) {

DONEFAILED真的可以实现通过TransactionSynchronizationFactoryBean应用于PollerSpec

/**
 * Specify the {@link TransactionSynchronizationFactory} to attach a
 * {@link org.springframework.transaction.support.TransactionSynchronization}
 * to the transaction around {@code poll} operation.
 * @param transactionSynchronizationFactory the TransactionSynchronizationFactory to use.
 * @return the spec.
 */
public PollerSpec transactionSynchronizationFactory(


文章来源: Implementing a MongoDB Inbound Flow with Spring Integration