Kafka Streams: Any guarantees on ordering of saves

2019-08-18 00:28发布

问题:

We have a Kafka Streams Java topology built with the Processor API.

In the topology, we have a single processor, that saves to multiple state stores.

As we use at_least_once, we would expect to see some inconsistencies between the state stores - e.g. an incoming record results in writes to both state store A and B, but a crash between the saves results in only the save to store A getting written to the Kafka change log topic.

  1. Are we guaranteed that the order in which we save will also be the order in which the writes to the state stores happen? E.g. if we first save to store A and then to store B, we can of course have situation where the write to both change logs succeeded, and a situation where only the write to change log A was completed - but can we also end up in a situation where only the write to change log B was completed?

  2. What situations will result in replays? A crash of course - but what about rebalances, new broker partition leader, or when we get an "Offset commit failed" error (The request timed out)?

  3. A while ago, we tried using exactly_once, which resulted in a lot of error messages, that didn't make sense to us. Would exactly_once give us atomic writes across multiple state stores?

回答1:

Ad 3. According to The original design document on exactly-once support in Kafka Streams I think with eaxctly_once you get atomic writes across multiple state stores

When stream.commit() is called, the following steps are executed in order:

  1. Flush local state stores (KTable caches) to make sure all changelog records are sent downstream.
  2. Call producer.sendOffsetsToTransactions(offsets) to commit the current recorded consumer’s positions within the transaction. Note that although the consumer of the thread can be shared among multiple tasks hence multiple producers, task’s assigned partitions are always exclusive, and hence it is safe to just commit the offsets of this tasks’ assigned partitions.
  3. Call producer.commitTransaction() to commit the current transaction. As a result the task state represented as the above triplet is committed atomically.
  4. Call producer.beginTransaction() again to start the next transaction.