Say you have a topic with a null key and the value is
{id:1, name:Chris, age:99}
Lets say you want to count up the number of people by name. You would do something like below:
nameStream.groupBy((key,value) -> value.getName())
.count();
Now lets says it is valid you can get duplicate records and you can tell it is a duplicate based on the id.
For example:
{id:1, name:Chris, age:99}
{id:1, name:Chris, age:xx}
Should result in a count of one and
{id:1, name:Chris, age:99}
{id:2, name:Chris, age:xx}
should result in a count of 2.
How would you accomplish this? I thought reduce would work, but misunderstood how that works.
You can use more than one attribute for grouping. Create a custom key by concatenation and pass as key:
KTable<String,String> modifiedTable = nameStream.groupBy((key,value) -> value.getName()+value.getId()).reduce((aggVal,newval) -> aggVal);
Above KTable will give the updated status for any record with the given name and ID.
So for {id:1,name:Chris.....}
, it will have only one record in KTable:
While in below case, both records will be present:
<Chris1, {id:1, name:Chris, age:99}>
<Chris2, {id:2, name:Chris, age:xx}>
Now you want to use the name attribute for count operation. So Change the key to name and re-group the table and perform count().
KTable countTable = modifiedTable.groupBy((k,v)-> KeyValue.pair(v.getName(), v)).count();
Here count() will be performed on top of KTable. KTable is the updated view for any given ID.
Hence for below input, modifiedTable will have 1 record at a time as updated value for key "Chris1" and you will get count=>1
<Chris,1> // Here key will be Chris1
Below input will result **count=>2
{id:1, name:Chris, age:99} // Here key was be Chris1
{id:2, name:Chris, age:xx} // Here key was be Chris2