I have a scenario where I am posting json to a Kafka instance. I am then using a Kafka Spout to emit the stream to a bolt.
But now I would like to add additional field (call it x
) to my json message. If x
is a
I would like it to be consumed by boltA, if x
is b
I would like it to be consumed by boltB.
Is there a way to direct the stream to the proper bolt depending on the streams contents?
The simplest way should be to add a SplitBolt
that consumes from KafkaSpout
, evaluates the field x
, and forwards to different output streams:
public class SplitBolt extends BaseRichBolt {
OutputCollector collector;
public void prepare(...) {
this.collector = collector;
}
public void execute(Tuple input) {
Object x = ... // get field x from input
String streamId;
if(x == a) {
streamId = "stream-xa";
} else { // x == b
streamId = "stream-xb";
}
collector.emit(streamId, input, input.getValues());
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
Fields schema = new Fields(...)
declarer.declareStream("stream-xa", schema);
declarer.declareStream("stream-xy", schema);
}
}
When building your Topology, you connect BoltA
to "stream-xa" and BoltB
to "stream-xb":
TopologyBuilder b = new TopologyBuilder();
b.setSpout("spout", new KafkaSpout(...));
b.setBolt("split", new SplitBolt()).shuffleGrouping("spout");
b.setBolt("boltA", new BoltA()).shuffleGrouping("split", "stream-xa");
b.setBolt("boltB", new BoltB()).shuffleGrouping("split", "stream-xb");
As an alternative, it should also be possible, to inherit from KafkaSpout
and emit to two different streams directly. However, the code is more tricky to get right.