Kafka Streams - How to better control partitioning

2019-03-03 15:34发布

问题:

State stores in Kafka Streams are created internally. State stores are partitioned by key, but do not allow to provide partitioning other than by key (to my knowledge).

QUESTIONS

How to control the number of partitions of a state-store internally created topic? How does the state store topic infer the number of partitions and the partitioning to use by default, and how to override?

How to work it around if you want to partition your state-store by something other than the key of your incoming key-value record and have co-partitioning? In this case, I'd like to partition by something more specific than my regular key. E.g. I have a

case class RegularKey(fieldA: String)

and I want to partition by

case class SpecificKey(fieldA: String, fieldB: String)

For my incoming source topic, I am using a HashPartitioner on the number of partitions.

回答1:

The number of changelog topic partitions depends in the number of input topic partitions and you cannot change it, because the state is shared based on this number (ie, there is one partitions by shard).

If you want to partition changelog topic by some attribute, you must set it as key. In your case, you must set SpecificKey as message key. It's not allowed to change the partitioning because it would "break" Kafka Streams leading to incorrect result.