We have been struggling with this issue for a long time now. In short, our storm topology stops emitting messages from spout after some time in a random fashion. We have an automated script which re-deploys the topology at 06:00 UTC everyday after the master data refresh activity is complete.
In the last 2 weeks, our topology stopped emitting the messages for 3 times in late UTC hours (between 22:00 and 02:00). It only comes online when we restart it which is around 06:00 UTC.
I've searched for many answers & blogs but couldn't find out what's happening here. We have an un-anchored topology which is a choice we have made like 3-4 years ago. We started with 0.9.2 and now we are on 1.1.0.
I've checked all kind of logs and I'm 100% sure that the nextTuple()
method for the controller is not getting called and there are no exceptions happening in the system which may cause this. I've also checked all kind of logs we accumulate and there is not even a single ERROR or WARN logs explaining the abrupt stoppage. The INFO logs are also not that helpful. There is nothing which can be connected to this issue in worker logs or supervisor logs or nimbus logs.
This is how our spout class looks: Controller.java
public class Controller implements IRichSpout {
SpoutOutputCollector _collector;
Calendar LAST_RUN = null;
List<ControllerMessage> msgList;
/**
* It is to open the spout
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
msgList= new ArrayList<ControllerMessage>();
MongoIndexingHandler mongoIndexingHandler = new MongoIndexingHandler();
mongoIndexingHandler.createMongoIndexes();
}
/**
* It executes the next tuple
*/
@Override
public void nextTuple() {
Map<String, Object> logMap = new HashMap<>();
logMap.put("BEGIN", new Date());
try {
TriggerHandler thandler = new TriggerHandler();
if (msgList.size() == 0) {
List<ControllerMessage> mList = thandler.getControllerMessage(new Date());
msgList = mList;
}
if (msgList.size() > 0) {
ControllerMessage message = msgList.get(0);
if(thandler.fire(message.getFireTime())) {
Util.log(message, "CONTROLLER_LOGS", message.getTime(), new Date());
msgList.remove(0);
_collector.emit(new Values(message));
}
}
else{
Utils.sleep(1000);
}
} catch (Exception e) {
_collector.reportError(e);
Util.exLog(e, "EXECUTOR_ERROR", new Date(), "nextTuple()",Controller.class);
}
}
/**
* It acknowledges the messages
*/
@Override
public void ack(Object id) {
}
/**
* It tells failed messages
*/
@Override
public void fail(Object id) {
}
/**
* It declares the message name
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("SPOUT_MESSAGE"));
}
@Override
public void activate() {
}
@Override
public void close() {
}
@Override
public void deactivate() {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
and this is the topology class: DiagnosticTopology.java
public class DiagnosticTopology {
public static void main(String[] args) throws Exception {
int gSize = (null != args && args.length > 0) ? Integer.parseInt(args[0]) : 2;
int sSize = (null != args && args.length > 1) ? Integer.parseInt(args[1]) : 128;
int sMSize = (null != args && args.length > 2) ? Integer.parseInt(args[2]) : 16;
int aGSize = (null != args && args.length > 3) ? Integer.parseInt(args[3]) : 16;
int rSize = (null != args && args.length > 4) ? Integer.parseInt(args[4]) : 64;
int rMSize = (null != args && args.length > 5) ? Integer.parseInt(args[5]) : 16;
int dMSize = (null != args && args.length > 6) ? Integer.parseInt(args[6]) : 8;
int wSize = (null != args && args.length > 7) ? Integer.parseInt(args[7]) : 16;
String topologyName = (null != args && args.length > 8) ? args[8] : "DIAGNOSTIC";
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("controller", new Controller(), 1);
builder.setBolt("generator", new GeneratorBolt(), gSize).shuffleGrouping("controller");
builder.setBolt("scraping", new ScrapingBolt(), sSize).shuffleGrouping("generator");
builder.setBolt("smongo", new MongoBolt(), sMSize).shuffleGrouping("scraping");
builder.setBolt("aggregation", new AggregationBolt(), aGSize).shuffleGrouping("scraping");
builder.setBolt("rule", new RuleBolt(), rSize).shuffleGrouping("smongo");
builder.setBolt("rmongo", new RMongoBolt(), rMSize).shuffleGrouping("rule");
builder.setBolt("dstatus", new DeviceStatusBolt(), dMSize).shuffleGrouping("rule");
builder.setSpout("trigger", new TriggerSpout(), 1);
builder.setBolt("job", new JobTriggerBolt(), 4).shuffleGrouping("trigger");
Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(wSize);
StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
}
}
We have fairly good servers (Xeon, 8 core, 32 GB and flash drives) in place for the production as well as testing environment and there are not external factors which can cause this issue as exception handling is everywhere in the code.
When this thing happens, it seems like everything stopped all of a sudden and there are no traces of why it happened.
Any help is highly appreciated!
We have observed this with our application where we had very busy CPU and all other threads were waiting for their turn. When we tried to find root cause using JVisualVM to check resource usage, we found that some function in some bolts were causing lot of overhead and CPU time. Please check via. any profiling tool if there are blocked threads in CPU critical path of nextTuple() method or are you receiving any data for the same from upstream.
I don't know what is causing your issue, but I'd recommend that you start by checking if upgrading to the latest Storm version resolves the issue. I know of at least two issues related to worker threads dying and not coming back up https://issues.apache.org/jira/browse/STORM-1750 https://issues.apache.org/jira/browse/STORM-2194. 1750 is fixed in 1.1.0, but 2194 is not fixed until 1.1.1.
In case upgrading doesn't fix the issue for you, you might be able to debug it by doing the following.
Next time your topology is hanging, go open Storm UI and find your spout. It'll show the list of executors running that spout, along with which workers are responsible for running them. Pick one of the workers where the spout executor isn't emitting anything. Open a shell on the machine running that worker, and find the worker JVM's process id. You can do this easily with
jps -m
.Example output showing the worker JVM with port 6701 on my local machine, which has pid 7592:
Trigger a thread dump by doing
kill -3 <pid>
, or usejstack <pid>
if you prefer.In the thread dump, you should be able to find the executor thread that's hanging. For instance, when I do a thread dump for a topology with a spout called "word", where one of the spout executors has number 13, I see
edit: Stack overflow won't let me post the stack trace because the heuristic looking for unformatted code is bad. I've spent probably as long trying to post the stack trace as writing the original answer, so I can't be bothered to keep trying. Here's the trace that should have been here https://pastebin.com/2Sz5kkQ1
which shows me what executor 13 is currently doing. In this case it's sleeping during a call to nextTuple.
If you can find out what your hanging executor is doing, you should be much better equipped to solve the issue, or report a bug to Storm.