We will have a Mongo collection contain multiple units of work. My idea is that the document will have a status field with four options: UNPROCESSED, PROCESSING, DONE, FAILED. Spring Integration will be configured to read from this db and process the messages stored there.
An inbound Mongo DSL flow will read from the collection based on a value of UNPROCESSED:
MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{'status' : 'UNPROCESSED'}"));
return IntegrationFlows.from(messageSource)...
Here's the problem: if I have a few worker machines reading from the same database I want to prevent them from operating on the same rows of UNPROCESSED data given my poller utilizes a conservative value for maxMessagesPerPoll
or message processing takes a while.
It seems the right venue is to use TransactionSynchronizationFactory
to define a ProcessBeforeCommit phase to update the status to PROCESSING, and a ProcessAfterCommit phase to update the status to DONE or FAILED. However, the mechanism to add this is not clear to me looking at the API for Pollers and TransactionManagers. There are examples in XML, but none that I can see utilizing DSL.
I would also want to ensure that ProcessBeforeCommit happens at the time of data base read and not after processing... does it? Also, if this is not the optimal way to engineer a solution that reads from a Mongo collection please feel free to suggest a better architecture.
No,
ProcessBeforeCommit
andProcessAfterCommit
are very close callbacks. They definitely happen in the end of your process. Let's consider you have a method like:When you call such a method, the transaction starts before entering a method body. When we exit a method body after its execution, the beforeCommit callback is performed. It may fail because during our process an external connection (DB ?) may be lost. And only if it is OK, we proceed to the
afterCommit
.What you are asking can be done via
AbstractMessageSourceAdvice
implementation: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-channels-section.html#conditional-pollers. So, in theafterReceive()
implementation you can update your document to thePROCESSING
and even decided to returnnull
instead of the message: just because its status in the DB is alreadyPROCESSING
. Such anAdvice
can be injected into thePollerSpec
:The
DONE
andFAILED
really can be achieved viaTransactionSynchronizationFactoryBean
applied to thePollerSpec
: