I am Trying to transform the string value obtained in my original stream "textlines" into JSONObject Messages using the function "mapValues" into newStream. Then stream whatever I get in newStream onto a topic called "testoutput". But everytime a message actually goes through the transformation block I get a NullPointerException with errors pointing only into kafka stream libraries. Have no idea what is going on :((
P.S. When I fork/create a new kafka stream from the original stream, does the new stream belong to the original builder? Since I need builder to create the KafkaStreams Object and start streaming I am not sure if I need to do something else with the new stream other than just specifying where its going .to("topic")
//Testing a Kafka Stream Application
public class testStream {
public static void main(String[] args) throws Exception {
//Configurations
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-teststream");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "xxxxxxxxxxxx:xxxx");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
//Building Stream
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textlines = builder.stream("mytest2");
//Printout The Inputs just for testing purposes
textlines.foreach(new ForeachAction<String, String>(){
public void apply(String key, String value){
for(int y=0; y<value.length(); y++){
System.out.print(value.charAt(y));
}
System.out.print("\n");
}
});
//Transform String Records to JSON Objects
KStream<String, JSONObject> newStream = textlines.mapValues(new ValueMapper<String,JSONObject>(){
@Override
public JSONObject apply(String value) {
JSONObject jsnobj = new JSONObject();
//If the first 4 letters of the message is "xxxx" then parse it to a
//JSON Object, otherwise create a dummy
if(value.substring(0, 4).equals("xxxx")){
jsnobj.put("Header_Title", value.substring(0, 4));
jsnobj.put("Data_Part", value.substring(4));
}else{
jsnobj.put("Header_Title", "Not xxxx");
jsnobj.put("Data_Part", "None");
}
return jsnobj;
}
});
//Specify target
newStream.to("testoutput");
//Off you go
KafkaStreams streams=new KafkaStreams(builder, props);
streams.start();
}
}
@Michael: I fixed my code with your suggestion. thanks a lot. My objective was to solve the read string to json value.
From what I can tell your problem is this line:
newStream
has the typeKStream<String, JSONObject>
.However, your application is configured to use, by default, a
String
serde to serialize/deserialize record keys and record values:This means that, when you do not provide explicit serdes in the
to()
call, Kafka Streams will attempt to write yournewStream
asKStream<String, String>
(rather thanKStream<String, JSONObject>
) back to Kafka.What you need to do is to provide explicit serdes in the
to()
call:Unfortunately, Kafka doesn't include an out-of-the-box JSON serde yet (it's planned). Fortunately, you can look at (and copy) the example JSON serde included in Kafka's own demo applications for the Kafka Streams API: https://github.com/apache/kafka/tree/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview