We are using CQRS + ES. The ES is NEventStore (folrmerly JOliver EventStore). We have 2 aggregates in different commands. The projections of the second AR depends on the data written by the first AR projections in the read model. The problem is that when we run the software everything goes so fast that sometimes the two aggregates are persisted in the event store with identical datetime (column CommitStamp). When replaying the events we get them from the beginning ordered by CommitStamp column. But if the two streams are with identical CommitStamp and taken in wrong order the read model projections blow with exceptions.
Any idea how to solve this problem?
===============================
Here is the discussion about this problem at github
https://github.com/NEventStore/NEventStore/issues/170
===============================
EDIT: This is how we currently replay events. I searched how GetFrom(...) works and it turned out that commitstamp column is not used for ordering. After all there is not commit order. So if I start replaying events it may return an event from today, next an event recorded 2 years ago, next etc
public void ReplayEvents(Action<List<UncommittedEvent>> whatToDoWithEvents, DateTime loadEventsAfterDate)
{
var eventPortion = store.Advanced.GetFrom(loadEventsAfterDate);
var uncommitedEventStream = new UncommittedEventStream();
foreach (var commit in eventPortion)
{
foreach (var eventMessage in commit.Events.ToList()))
{
uncommitedEventStream.Append(new UncommittedEvent(eventMessage.Body));
}
}
whatToDoWithEvents(uncommitedEventStream.ToList());
}
In NEventStore, the consistency boundary is the stream. As of version 3.2 (as @Marijn mentioned, issue #159) the CommitSequence column is used to order CommitMessages (and the therein contained EventMessages) when reading from a stream across all persistence engines.
EventMessage ordering is guaranteed on a per stream basis. There is no implied ordering of messages across streams. Any actual ordering that may occur as a result some aspect of the chosen persistence engine is accidental and must not be relied upon.
To guarantee ordering across streams will severely restrict the distributed-friendly aspects of the library. Even if we were to consider such a feature, it would have to work with all supported persistence engines, which includes NoSQL stores.
If you are practising Domain Driven Design, where each stream represents an aggregate root, and you need to guarantee ordering across 2 or more aggregates, this points to a design issue in your domain model.
If your projections need to merge values from multiple sources (streams), you can rely on ordering intra- source, but you need be flexible on ordering inter-source. You should also account for the possibility of duplicate messages, especially if you are replaying through an external bus or queue.
If you attempt to re-order multiple streams on the receiver end using a timestamp (CommitStamp), that will be fragile. Timestamps have a fixed resolution (ms, tick, etc). Even with a single writer, things may still happen 'at the same time'.
Damian added a checkpoint column in the database. This is in the current master branch. When the events a replayed with GetFromCheckpoint(int)
the results are correct.
At the database level, while the CommitStamp is fine for filtering, the CommitSequence column is the one that should guide the ordering.
As for that that translates to in terms of API calls on whatever version of the libs you're using -- I'll leave that as an exercise for you (or if you fill in a code snippet and/or a mention of the version perhaps someone else can step in)