How to discover and filter out duplicate records i

2019-05-21 00:48发布

问题:

Say you have a topic with a null key and the value is

{id:1, name:Chris, age:99}

Lets say you want to count up the number of people by name. You would do something like below:

nameStream.groupBy((key,value) -> value.getName())
           .count();

Now lets says it is valid you can get duplicate records and you can tell it is a duplicate based on the id.

For example:

{id:1, name:Chris, age:99}
{id:1, name:Chris, age:xx}

Should result in a count of one and

   {id:1, name:Chris, age:99}
   {id:2, name:Chris, age:xx}

should result in a count of 2.

How would you accomplish this? I thought reduce would work, but misunderstood how that works.

回答1:

You can use more than one attribute for grouping. Create a custom key by concatenation and pass as key:

KTable<String,String> modifiedTable =  nameStream.groupBy((key,value) -> value.getName()+value.getId()).reduce((aggVal,newval) -> aggVal);

Above KTable will give the updated status for any record with the given name and ID. So for {id:1,name:Chris.....}, it will have only one record in KTable:

While in below case, both records will be present:

<Chris1,  {id:1, name:Chris, age:99}> 
<Chris2,   {id:2, name:Chris, age:xx}> 

Now you want to use the name attribute for count operation. So Change the key to name and re-group the table and perform count().

KTable countTable = modifiedTable.groupBy((k,v)-> KeyValue.pair(v.getName(), v)).count();

Here count() will be performed on top of KTable. KTable is the updated view for any given ID.
Hence for below input, modifiedTable will have 1 record at a time as updated value for key "Chris1" and you will get count=>1

<Chris,1> // Here key will be Chris1

Below input will result **count=>2

{id:1, name:Chris, age:99}  // Here key was be Chris1
{id:2, name:Chris, age:xx}  // Here key was be Chris2