Event sourcing with Kafka streams

2020-06-23 06:24发布

I'm trying to implement a simple CQRS/event sourcing proof of concept on top of Kafka streams (as described in https://www.confluent.io/blog/event-sourcing-using-apache-kafka/)

I have 4 basic parts:

  1. commands topic, which uses the aggregate ID as the key for sequential processing of commands per aggregate
  2. events topic, to which every change in aggregate state are published (again, key is the aggregate ID). This topic has a retention policy of "never delete"
  3. A KTable to reduce aggregate state and save it to a state store

    events topic stream ->
    group to a Ktable by aggregate ID ->
    reduce aggregate events to current state ->
    materialize as a state store
    
  4. commands processor - commands stream, left joined with aggregate state KTable. For each entry in the resulting stream, use a function (command, state) => events to produce resulting events and publish them to the events topic

The question is - is there a way to make sure I have the latest version of the aggregate in the state store?

I want to reject a command if violates business rules (for example - a command to modify the entity is not valid if the entity was marked as deleted). But if a DeleteCommand is published followed by a ModifyCommand right after it, the delete command will produce the DeletedEvent, but when the ModifyCommand is processed, the loaded state from the state store might not reflect that yet and conflicting events will be published.

I don't mind sacrificing command processing throughput, I'd rather get the consistency guarantees (since everything is grouped by the same key and should end up in the same partition)

Hope that was clear :) Any suggestions?

3条回答
再贱就再见
2楼-- · 2020-06-23 07:01

I don't think Kafka is good for CQRS and Event sourcing yet, the way you described it, because it lacks a (simple) way of ensuring protection from concurrent writes. This article talks about this in details.

What I mean by the way you described it is the fact that you expect a command to generate zero or more events or to fail with an exception; this is the classical CQRS with Event sourcing. Most of the people expect this kind of Architecture.

You could have Event sourcing however in a different style. Your Command handlers could yield events for every command that is received (i.e. DeleteWasAccepted). Then, an Event handler could eventually handle that Event in an Event sourced way (by rebuilding Aggregate's state from its event stream) and emit other Events (i.e. ItemDeleted or ItemDeletionWasRejected). So, commands are fired-and-forget, sent async, the client does not wait for an immediate response. It waits however for an Event describing the outcome of its command execution.

An important aspect is that the Event handler must process events from the same Aggregate in a serial way (exactly once and in order). This can be implemented using a single Kafka Consumer Group. You can see about this architecture in this video.

查看更多
干净又极端
3楼-- · 2020-06-23 07:09

Please read this article by my colleague Jesper. Kafka is a great product but actually not a good fit at all for event sourcing

https://medium.com/serialized-io/apache-kafka-is-not-for-event-sourcing-81735c3cf5c

查看更多
forever°为你锁心
4楼-- · 2020-06-23 07:12

A possible solution I came up with is to implement a sort of optimistic locking mechanism:

  1. Add an expectedVersion field on the commands
  2. Use the KTable Aggregator to increase the version of the aggregate snapshot for each handled event
  3. Reject commands if the expectedVersion doesn't match the snapshot's aggregate version

This seems to provide the semantics I'm looking for

查看更多
登录 后发表回答