How do I transform/fork a Kafka stream and send it

2019-07-25 08:18发布

I am Trying to transform the string value obtained in my original stream "textlines" into JSONObject Messages using the function "mapValues" into newStream. Then stream whatever I get in newStream onto a topic called "testoutput". But everytime a message actually goes through the transformation block I get a NullPointerException with errors pointing only into kafka stream libraries. Have no idea what is going on :((

P.S. When I fork/create a new kafka stream from the original stream, does the new stream belong to the original builder? Since I need builder to create the KafkaStreams Object and start streaming I am not sure if I need to do something else with the new stream other than just specifying where its going .to("topic")

//Testing a Kafka Stream Application
public class testStream {

public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-teststream");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "xxxxxxxxxxxx:xxxx");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

    //Building Stream
    KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> textlines ="mytest2"); 

    //Printout The Inputs just for testing purposes
    textlines.foreach(new ForeachAction<String, String>(){
        public void apply(String key, String value){
            for(int y=0; y<value.length(); y++){

    //Transform String Records to JSON Objects
    KStream<String, JSONObject> newStream = textlines.mapValues(new ValueMapper<String,JSONObject>(){
        public JSONObject apply(String value) {

            JSONObject jsnobj = new JSONObject();

            //If the first 4 letters of the message is "xxxx" then parse it to a 
            //JSON Object, otherwise create a dummy
            if(value.substring(0, 4).equals("xxxx")){               
                jsnobj.put("Header_Title", value.substring(0, 4));
                jsnobj.put("Data_Part", value.substring(4));
                jsnobj.put("Header_Title", "Not xxxx");
                jsnobj.put("Data_Part", "None");
            return jsnobj;

    //Specify target"testoutput");

    //Off you go
    KafkaStreams streams=new KafkaStreams(builder, props);


2楼-- · 2019-07-25 08:31

@Michael: I fixed my code with your suggestion. thanks a lot. My objective was to solve the read string to json value.

    KStreamBuilder builder = new KStreamBuilder();

    KStream<String, String> textLines ="input-topic-name");
    // do stuff

    Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
    Serializer<JsonNode> jsonSerializer = new JsonSerializer();

    Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

    KStream<String, JsonNode> newStream = textLines.mapValues(new ValueMapper<String,JsonNode>(){
        public JsonNode apply(String value) {

            JSONObject jsnObj = new JSONObject();

            //If the first 4 letters of the message is "xxxx" then parse it to a 
            //JSON Object, otherwise create a dummy
                jsnObj.put("Header_Title", value.toString());

                    ObjectMapper objectMapper = new ObjectMapper();

                    JsonNode json_value = null;
                    try {
                        json_value = objectMapper.readTree(jsnObj.toString());
                    } catch (IOException e) {
                        // TODO Auto-generated catch block

            return json_value;

    });, jsonSerde, "json-output");  
3楼-- · 2019-07-25 08:43

From what I can tell your problem is this line:"testoutput");

newStream has the type KStream<String, JSONObject>.

However, your application is configured to use, by default, a String serde to serialize/deserialize record keys and record values:

props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

This means that, when you do not provide explicit serdes in the to() call, Kafka Streams will attempt to write your newStream as KStream<String, String> (rather than KStream<String, JSONObject>) back to Kafka.

What you need to do is to provide explicit serdes in the to() call:

// Sth like this, myJsonSerde, "testoutput");

Unfortunately, Kafka doesn't include an out-of-the-box JSON serde yet (it's planned). Fortunately, you can look at (and copy) the example JSON serde included in Kafka's own demo applications for the Kafka Streams API:

登录 后发表回答