I am building a Java Spring application using Storm 1.1.2 and Kafka 0.11 to be launched in a Docker container.
Everything in my topology works as planned but under a high load from Kafka, the Kafka lag increases more and more over time.
My KafkaSpoutConfig:
KafkaSpoutConfig<String,String> spoutConf =
KafkaSpoutConfig.builder("kafkaContainerName:9092", "myTopic")
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "myGroup")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyObjectDeserializer.class)
.build()
Then my topology is as follows
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("stormKafkaSpout", new KafkaSpout<String,String>(spoutConf), 25);
builder.setBolt("routerBolt", new RouterBolt(),25).shuffleGrouping("stormKafkaSpout");
Config conf = new Config();
conf.setNumWorkers(10);
conf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of("zookeeper"));
conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
conf.put(Config.NIMBUS_SEEDS, ImmutableList.of("nimbus"));
conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
System.setProperty("storm.jar", "/opt/storm.jar");
StormSubmitter.submitTopology("topologyId", conf, builder.createTopology());
The RouterBolt (which extends BaseRichBolt) does one very simple switch statement and then uses a local KafkaProducer object to send a new message to another topic. Like I said, everything compiles and the topology runs as expected but under a high load (3000 messages/s), the Kafka lag just piles up equating to low throughput for the topology.
I've tried disabling acking with
conf.setNumAckers(0);
and
conf.put(Config.TOPOLGY_ACKER_EXECUTORS, 0);
but I guess it's not an acking issue.
I've seen on the Storm UI that the RouterBolt has execution latency of 1.2ms and process latency of .03ms under the high load which leads me to believe the Spout is the bottleneck.Also the parallelism hint is 25 because there are 25 partitions of "myTopic". Thanks!