Mass Transit: ensure message processing order when

2019-07-13 15:52发布

问题:

I'm new to Mass Transit and I would like to understand if it can helps with my scenario. I'm building a sample application implemented with a CQRS event sourcing architecture and I need a service bus in order to dispatch the events created by the command stack to the query stack denormalizers.

Let's suppose of having a single aggregate in our domain, let's call it Photo, and two different domain events: PhotoUploaded and PhotoArchived.

Given this scenario, we have two different message types and the default Mass Transit behaviour is creating two different RabbitMq exchanges: one for the PhotoUploaded message type and the other for the PhotoArchived message type.

Let's suppose of having a single denormalizer called PhotoDenormalizer: this service will be a consumer of both message types, because the photo read model must be updated whenever a photo is uploaded or archived.

Given the default Mass Transit topology, there will be two different exchanges so the message processing order cannot be guaranteed between events of different types: the only guarantee that we have is that all the events of the same type will be processed in order, but we cannot guarantee the processing order between events of different type (notice that, given the events semantic of my example, the processing order matters).

How can I handle such a scenario ? Is Mass Transit suitable with my needs ? Am I completely missing the point with domain events dispatching ?

回答1:

Disclaimer: this is not an answer to your question, but rather a preventive message why you should not do what you are planning to do.

Whilst message brokers like RMQ and messaging middleware libraries like MassTransit are perfect for integration, I strongly advise against using message brokers for event-sourcing. I can refer to my old answer Event-sourcing: when (and not) should I use Message Queue? that explains the reasons behind it.

One of the reasons you have found yourself - event order will never be guaranteed.

Another obvious reason is that building read models from events that are published via a message broker effectively removes the possibility for replay and to build new read models that would need to start processing events from the beginning of time, but all they get are events that are being published now.

Aggregates form transactional boundaries, so every command needs to guarantee that it completes within one transaction. Whilst MT supports the transaction middleware, it only guarantees that you get a transaction for dependencies that support them, but not for context.Publish(@event) in the consumer body, since RMQ doesn't support transactions. You get a good chance of committing changes and not getting events on the read side. So, the rule of thumb for event stores that you should be able to subscribe to the stream of changes from the store, and not publish events from your code, unless those are integration events and not domain events.

For event-sourcing, it is crucial that each read-model keeps its own checkpoint in the stream of events it is projecting. Message brokers don't give you that kind of power since the "checkpoint" is actually your queue and as soon as the message is gone from the queue - it is gone forever, there's no coming back.

Concerning the actual question:

You can use the message topology configuration to set the same entity name for different messages and then they'll be published to the same exchange, but that falls to the "abuse" category like Chris wrote on that page. I haven't tried that but you definitely can experiment. Message CLR type is part of the metadata, so there shouldn't be deserialization issues.

But again, putting messages in the same exchange won't give you any ordering guarantees, except the fact that all messages will land in one queue for the consuming service.

You will have to at least set the partitioning filter based on your aggregate id, to prevent multiple messages for the same aggregate from being processed in parallel. That, by the way, is also useful for integration. That's how we do it:

void AddHandler<T>(Func<ConsumeContext<T>, string> partition) where T : class
    => ep.Handler<T>(
        c => appService.Handle(c, aggregateStore), 
        hc => hc.UsePartitioner(8, partition));

AddHandler<InternalCommands.V1.Whatever>(c => c.Message.StreamGuid);