when reading messages from kafka using KafkaUtils.createDirectStream, the v1._1 member of the Tuple2 is null:
KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
).map(new Function<Tuple2<String,String>, String>() {
@Override
public String call(Tuple2<String, String> v1)
throws Exception {
System.out.println(v1._1);
return null;
}
});
while the _2 member contains the message itself that was passed to kafka.
I have two questions:
1) why is v1._1 null?
2) is there a way to pass the topic name in kafka (the same way the message is put into kafka) so that v1._1 will contain the topic name?