We have been struggling with this issue for a long time now. In short, our storm topology stops emitting messages from spout after some time in a random fashion. We have an automated script which re-deploys the topology at 06:00 UTC everyday after the master data refresh activity is complete.
In the last 2 weeks, our topology stopped emitting the messages for 3 times in late UTC hours (between 22:00 and 02:00). It only comes online when we restart it which is around 06:00 UTC.
I've searched for many answers & blogs but couldn't find out what's happening here. We have an un-anchored topology which is a choice we have made like 3-4 years ago. We started with 0.9.2 and now we are on 1.1.0.
I've checked all kind of logs and I'm 100% sure that the nextTuple()
method for the controller is not getting called and there are no exceptions happening in the system which may cause this. I've also checked all kind of logs we accumulate and there is not even a single ERROR or WARN logs explaining the abrupt stoppage. The INFO logs are also not that helpful. There is nothing which can be connected to this issue in worker logs or supervisor logs or nimbus logs.
This is how our spout class looks: Controller.java
public class Controller implements IRichSpout {
SpoutOutputCollector _collector;
Calendar LAST_RUN = null;
List<ControllerMessage> msgList;
/**
* It is to open the spout
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
msgList= new ArrayList<ControllerMessage>();
MongoIndexingHandler mongoIndexingHandler = new MongoIndexingHandler();
mongoIndexingHandler.createMongoIndexes();
}
/**
* It executes the next tuple
*/
@Override
public void nextTuple() {
Map<String, Object> logMap = new HashMap<>();
logMap.put("BEGIN", new Date());
try {
TriggerHandler thandler = new TriggerHandler();
if (msgList.size() == 0) {
List<ControllerMessage> mList = thandler.getControllerMessage(new Date());
msgList = mList;
}
if (msgList.size() > 0) {
ControllerMessage message = msgList.get(0);
if(thandler.fire(message.getFireTime())) {
Util.log(message, "CONTROLLER_LOGS", message.getTime(), new Date());
msgList.remove(0);
_collector.emit(new Values(message));
}
}
else{
Utils.sleep(1000);
}
} catch (Exception e) {
_collector.reportError(e);
Util.exLog(e, "EXECUTOR_ERROR", new Date(), "nextTuple()",Controller.class);
}
}
/**
* It acknowledges the messages
*/
@Override
public void ack(Object id) {
}
/**
* It tells failed messages
*/
@Override
public void fail(Object id) {
}
/**
* It declares the message name
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("SPOUT_MESSAGE"));
}
@Override
public void activate() {
}
@Override
public void close() {
}
@Override
public void deactivate() {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
and this is the topology class: DiagnosticTopology.java
public class DiagnosticTopology {
public static void main(String[] args) throws Exception {
int gSize = (null != args && args.length > 0) ? Integer.parseInt(args[0]) : 2;
int sSize = (null != args && args.length > 1) ? Integer.parseInt(args[1]) : 128;
int sMSize = (null != args && args.length > 2) ? Integer.parseInt(args[2]) : 16;
int aGSize = (null != args && args.length > 3) ? Integer.parseInt(args[3]) : 16;
int rSize = (null != args && args.length > 4) ? Integer.parseInt(args[4]) : 64;
int rMSize = (null != args && args.length > 5) ? Integer.parseInt(args[5]) : 16;
int dMSize = (null != args && args.length > 6) ? Integer.parseInt(args[6]) : 8;
int wSize = (null != args && args.length > 7) ? Integer.parseInt(args[7]) : 16;
String topologyName = (null != args && args.length > 8) ? args[8] : "DIAGNOSTIC";
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("controller", new Controller(), 1);
builder.setBolt("generator", new GeneratorBolt(), gSize).shuffleGrouping("controller");
builder.setBolt("scraping", new ScrapingBolt(), sSize).shuffleGrouping("generator");
builder.setBolt("smongo", new MongoBolt(), sMSize).shuffleGrouping("scraping");
builder.setBolt("aggregation", new AggregationBolt(), aGSize).shuffleGrouping("scraping");
builder.setBolt("rule", new RuleBolt(), rSize).shuffleGrouping("smongo");
builder.setBolt("rmongo", new RMongoBolt(), rMSize).shuffleGrouping("rule");
builder.setBolt("dstatus", new DeviceStatusBolt(), dMSize).shuffleGrouping("rule");
builder.setSpout("trigger", new TriggerSpout(), 1);
builder.setBolt("job", new JobTriggerBolt(), 4).shuffleGrouping("trigger");
Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(wSize);
StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
}
}
We have fairly good servers (Xeon, 8 core, 32 GB and flash drives) in place for the production as well as testing environment and there are not external factors which can cause this issue as exception handling is everywhere in the code.
When this thing happens, it seems like everything stopped all of a sudden and there are no traces of why it happened.
Any help is highly appreciated!