I have a kafka streams application waiting for records to be published on topic user_activity
. It will receive json data and depending on the value of against a key I want to push that stream into different topics.
This is my streams App code:
KStream<String, String> source_user_activity = builder.stream("user_activity");
source_user_activity.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
System.out.println("value: " + value);
ArrayList<String> keywords = new ArrayList<String>();
try {
JSONObject send = new JSONObject();
JSONObject received = new JSONObject(value);
send.put("current_date", getCurrentDate().toString());
send.put("activity_time", received.get("CreationTime"));
send.put("user_id", received.get("UserId"));
send.put("operation_type", received.get("Operation"));
send.put("app_name", received.get("Workload"));
keywords.add(send.toString());
// apply regex to value and for each match add it to keywords
} catch (Exception e) {
// TODO: handle exception
System.err.println("Unable to convert to json");
e.printStackTrace();
}
return keywords;
}
}).to("user_activity_by_date");
In this code, I want to check operation type and then depending on that I want to push the streams into the relevant topic.
How can I achieve this?
EDIT:
I have updated my code to this:
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity");
KStream<String, String>[] branches = source_o365_user_activity.branch(
(key, value) -> (value.contains("Operation\":\"SharingSet") && value.contains("ItemType\":\"File")),
(key, value) -> (value.contains("Operation\":\"AddedToSecureLink") && value.contains("ItemType\":\"File")),
(key, value) -> true
);
branches[0].to("o365_sharing_set_by_date");
branches[1].to("o365_added_to_secure_link_by_date");
branches[2].to("o365_user_activity_by_date");
You can use
branch
method in order to split your stream. This method takes predicates for splitting the source stream into several streams.The code below is taken from kafka-streams-examples:
The original
KStream.branch
method is invonvenient because of mixed arrays and generics, and because it forces one to use 'magic numbers' to extract the right branch from the result (see e.g. KAFKA-5488 issue). Starting from spring-kafka 2.2.4, KafkaStreamBrancher class is going to be available. With it, more convenient branching will be possible:There is also KIP-418, so a there is also a chance that such class will appear in Kafka itself.