Storm - Conditionally consuming stream from kafka

2019-06-02 22:45发布

问题:

I have a scenario where I am posting json to a Kafka instance. I am then using a Kafka Spout to emit the stream to a bolt.

But now I would like to add additional field (call it x) to my json message. If x is a I would like it to be consumed by boltA, if x is b I would like it to be consumed by boltB.

Is there a way to direct the stream to the proper bolt depending on the streams contents?

回答1:

The simplest way should be to add a SplitBolt that consumes from KafkaSpout, evaluates the field x , and forwards to different output streams:

public class SplitBolt extends BaseRichBolt {
  OutputCollector collector;

  public void prepare(...) {
    this.collector = collector;
  }

  public void execute(Tuple input) {
    Object x = ... // get field x from input
    String streamId;
    if(x == a) {
      streamId = "stream-xa";
    } else { // x == b
      streamId = "stream-xb";
    }
    collector.emit(streamId, input, input.getValues());
  }

  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    Fields schema = new Fields(...)
    declarer.declareStream("stream-xa", schema);
    declarer.declareStream("stream-xy", schema);
  }
}

When building your Topology, you connect BoltA to "stream-xa" and BoltB to "stream-xb":

TopologyBuilder b = new TopologyBuilder();
b.setSpout("spout", new KafkaSpout(...));
b.setBolt("split", new SplitBolt()).shuffleGrouping("spout");
b.setBolt("boltA", new BoltA()).shuffleGrouping("split", "stream-xa");
b.setBolt("boltB", new BoltB()).shuffleGrouping("split", "stream-xb");

As an alternative, it should also be possible, to inherit from KafkaSpout and emit to two different streams directly. However, the code is more tricky to get right.