Kafka Streams engine maps a partition to exactly one worker (i.e. Java App), so that all messages in that partition are processed by that worker. I have the following scenario, and am trying to understand if it is still feasible for it to work.
I have a Topic A (with 3 partitions). The messages sent to it are partitioned randomly by Kafka (i.e. there is no key). The message I send to it has a schema like below
{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
Since I have 3 partitions, and the messages are partitioned randomly across them, cars of the same model could be written to different partitions. For example
{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Toyota", color: "Blue", timeStampEpoch: 14334343342}
{carModel: "Toyota", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Nissan", color: "Blue", timeStampEpoch: 14334343342}
{carModel: "Nissan", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Nissan", color: "Blue", timeStampEpoch: 14334343342}
Now let's say I wanted to count the total number of cars seen by carModel. I write a Kafka Streams application that listens to topic A, maps messages by carModel, i.e.
carStream.map((key, value) -> KeyValue.pair(value["carModel"], value))
and writes the total to another topic B, a message of the form
{carModel: "Nissan", totalCount: 5}
I then launch 3 instances of it, all part of the same Consumer Group. Kafka would then efficiently map each partition to one of the workers. Example
P1 --> Worker A
P2 --> Worker B
P3 --> Worker C
However, since each Worker only sees 1 partition then it will only see partial information for each car model. It will miss data for the same car model from other partitions.
Question: Is my understanding correct?
If it is, I can imagine that I could re-partition (i.e. reshuffle) my data by carModel for this use case to work.
But I just want to make sure I'm not misunderstanding how this works, and in fact Kafka does somehow magically take care of the re-partitioning after my internal mapping in my application.
You can use a singleton object which has synchronized increment(String carModel) function....access this singleton object from 3 threads and increment the number of occurrences of each model.
Kafka Streams will do the repartitioning of your data automatically. Your program will be something like:
For this pattern, Kafka Streams detects that you set a new key in
and thus will create a topic automatically in the background to repartition the data for thegroupByKey().count()
step (as of v0.10.1 via KAFKA-3561).Basically, the program from above is executed in the same way as
Kafka Streams automatically "insert" the
step and thus computes the correct result.