Adding 'for' loop with delay in kafka stre

2019-08-24 03:56发布

Below is the code written to get the output as follows.

my input will be in the form of JSON arrays, to seperate JSON arrays into Json objects i wrote the following code

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textlines = builder.stream("INTOPIC");
KStream<String, String> mstream = textlines
                .mapValues(value -> value.replace("[","" ) )
                .mapValues(value -> value.replace("]","" ) )
                .mapValues(value -> value.replaceAll("\\},\\{" ,"\\}\\},\\{\\{"))
                .flatMapValues(value -> Arrays.asList(value.split("\\},\\{")));
mstream.to("OUTTOPIC");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

The output of all JSON Objects in the input array are coming at once , but i want these JSON Objects to come one by one. Should i need to add delay if so how should i do it? Can you please help me in editing this code.

null    {"timestamp":"2017-12-05T13:52:43.560588466+05:30","namespace":"/intel/procfs/meminfo/active_anon","data":4183900160,"unit":"","tags":{"plugin_running_on":"AELAB110"},"version":4,"last_advertised_time":"2017-12-05T13:52:43.560658038+05:30"}
null    {"timestamp":"2017-12-05T13:52:43.560618652+05:30","namespace":"/intel/procfs/meminfo/mem_free","data":49385746432,"unit":"","tags":{"plugin_running_on":"AELAB110"},"version":4,"last_advertised_time":"2017-12-05T13:52:43.560660098+05:30"}
null    {"timestamp":"2017-12-05T13:52:43.560588466+05:30","namespace":"/intel/procfs/meminfo/active_anon","data":4183900160,"unit":"","tags":{"plugin_running_on":"AELAB110"},"version":4,"last_advertised_time":"2017-12-05T13:52:43.560658038+05:30"}
null    {"timestamp":"2017-12-05T13:52:43.560618652+05:30","namespace":"/intel/procfs/meminfo/mem_free","data":49385746432,"unit":"","tags":{"plugin_running_on":"AELAB110"},"version":4,"last_advertised_time":"2017-12-05T13:52:43.560660098+05:30"}

These json objects should come one after other so what should be there in my code

Thanks in advance.

0条回答
登录 后发表回答