I've been looking through a lot of the Kafka documentation for a java application that I am working on. I've tried getting into the lambda syntax introduced in Java 8, but I am a little sketchy on that ground and don't feel too confident that it should be what I use as of yet.
I've a Kafka/Zookeeper Service running without any troubles, and what I want to do is write a small example program that based on the input will write it out, but not do a wordcount as there are so many examples of already.
As for sample data I will be getting a string of following structure:
Example data
This a sample string containing some keywords such as GPS, GEO and maybe a little bit of ACC.
Question
I want to be able to extract the 3 letter keywords and print them with a System.out.println
. How do I get a string variable containing the input? I know how to apply regular expressions or even just searching through the string to get the keywords.
Code
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app_id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "0:0:0:0:0:0:0:1:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "0:0:0:0:0:0:0:1:2181");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
final Serde<String> stringSerde = Serdes.String();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
//How do I assign the input from in-stream to the following variable?
String variable = ?
}
I have zookeeper, kafka, producer and consumer running all hooked up to the same topic so I want to basically see the same String
appear on all of the instances (producer, consumer and stream).
If you use Kafka Streams, you need to apply functions/operators on your data streams. In your case, you create a
KStream
object, thus, you want to apply an operator tosource
.Depending on what you want to do, there are operators that apply a function to each record in the stream independently (eg.
map()
), or other operators that apply a function to multiple record together (eg.aggregateByKey()
). You should have a look into the documentation: http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl and examples https://github.com/confluentinc/examples/tree/kafka-0.10.0.0-cp-3.0.0/kafka-streamsThus, you never create local variables using Kafka Streams as you show in your example above, but rather embed everything in operators/functions that get chained together.
For example, if you want to print all input record to stdout, you could do
Thus, after you start your application via
streams.start()
, it will consumer the records from you input topic and for each record of your topic, a call toapply(...)
is done, which prints the record on stdout.Of course, a more native way for printing the stream to the console would be to use
source.print()
(which internally is basically the same as the shownforeach()
operator with an already givenForeachAction
.)For your example with assigning the string to a local variable, you would need to put your code into
apply(...)
and do your regex-stuff etc. there to "extract the 3 letter keywords".The best way to express this, would however be via a combination of
flatMapValues()
andprint()
(ie,source.flatMapValues(...).print()
).flatMapValues()
is called for each input record (in your case, I assume key will benull
so you can ignore it). Within yourflatMapValue
function, you apply your regex and for each match, you add the match to a list of values that you finally return.The output of
flatMapValues
will be aKStream
again, containing a record for each found keyword (ie, the output stream is a "union" over all lists your return inValueMapper#apply()
). Finally, you just print your result to console viaprint()
. (Of course, you could also use a singleforeach
instead offlatMapValue
+print
but this would be less modular.)