We're experiencing an issue with one of our MQTT subscribers in Spring integration (4.0.3.RELEASE running on Tomcat 7 with the Paho MQTT Client 0.4.0).
The issue is with a subscriber on a heavily used topic (lots of messages). The devices sending the messages to the topic are devices in the field connecting over GPRS.
Spring integration and the broker (Mosquitto) are running on the same server.
The issue seems to appear after doing a couple of redeploys on the Tomcat without restarting the server. When the issue occurs, a restart of the tomcat instance fixes it for a while.
Here's the chain of events (from the mosquitto logs. The vdm-dev-live
subscriber is the one with the issues):
When starting spring integration, we see all subscribers connecting to the various topics:
1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-live (c1, k60).
1409645645: Sending CONNACK to vdm-dev-live (0)
1409645645: Received SUBSCRIBE from vdm-dev-live
1409645645: vdm/+/+/+/liveData (QoS 1)
1409645645: Sending SUBACK to vdm-dev-live
1409645645: New connection from xxx.xx.xx.xxx on port 1873.
1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-fmReq (c1, k60).
1409645645: Sending CONNACK to vdm-dev-fmReq (0)
1409645645: Received SUBSCRIBE from vdm-dev-fmReq
1409645645: vdm/+/+/+/firmware/request (QoS 1)
1409645645: Sending SUBACK to vdm-dev-fmReq
1409645645: New connection from xxx.xx.xx.xxx on port 1873.
1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-cfgReq (c1, k60).
1409645645: Sending CONNACK to vdm-dev-cfgReq (0)
1409645645: Received SUBSCRIBE from vdm-dev-cfgReq
1409645645: vdm/+/+/+/config/request (QoS 1)
1409645645: Sending SUBACK to vdm-dev-cfgReq
1409645645: New connection from xxx.xx.xx.xxx on port 1873.
1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-fmStat (c1, k60).
1409645645: Sending CONNACK to vdm-dev-fmStat (0)
1409645645: Received SUBSCRIBE from vdm-dev-fmStat
1409645645: vdm/+/+/firmware/status (QoS 1)
1409645645: Sending SUBACK to vdm-dev-fmStat
We see messages going back and forth
1409645646: Received PUBLISH from 89320292400015932480 (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645646: Sending PUBLISH to vdm-dev-live (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645646: Sending PUBLISH to Yo3zC8ou5y (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645646: Sending PUBLISH to mqttjs_31f1e3f7cd0e0aed (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Received PUBLISH from 89320292400015932480 (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Sending PUBLISH to vdm-dev-live (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Sending PUBLISH to Yo3zC8ou5y (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Sending PUBLISH to mqttjs_31f1e3f7cd0e0aed (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Received PUBLISH from 89320292400015932480 (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Sending PUBLISH to vdm-dev-live (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Sending PUBLISH to Yo3zC8ou5y (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Sending PUBLISH to mqttjs_31f1e3f7cd0e0aed (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
We see the ping requests from the various subscribers
1409645705: Received PINGREQ from vdm-dev-update
1409645705: Sending PINGRESP to vdm-dev-update
1409645705: Received PINGREQ from vdm-dev-live
1409645705: Sending PINGRESP to vdm-dev-live
1409645705: Received PINGREQ from vdm-dev-fmReq
1409645705: Sending PINGRESP to vdm-dev-fmReq
1409645705: Received PINGREQ from vdm-dev-cfgReq
1409645705: Sending PINGRESP to vdm-dev-cfgReq
1409645705: Received PINGREQ from vdm-dev-fmStat
1409645705: Sending PINGRESP to vdm-dev-fmStat
But all of a sudden we see this:
1409645776: Socket error on client vdm-dev-live, disconnecting.
And at that point the subscriber is dead. We're not seeing any ping requests and it is no longer processing any messages from that topic. On a broker level everything is still fine, as I have debug-log subscribers (using NodeJS) where I see that those subscribers are still processing the messages from that topic just fine (so the issue is on the subscriber level).
In the tomcat logs we also see this:
Sep 02, 2014 10:16:05 AM org.eclipse.paho.client.mqttv3.internal.ClientState checkForActivity
SEVERE: vdm-dev-live: Timed out as no activity, keepAlive=60,000 lastOutboundActivity=1,409,645,705,714 lastInboundActivity=1,409,645,755,075
But Paho doesn't do any cleanup / restart of this subscriber.
I'm also seeing this in the Tomcat logs:
SEVERE: The web application [/vdmapp] appears to have started a thread named [MQTT Snd: vdm-dev-live] but has failed to stop it. This is very likely to create a memory leak.
I also noticed a lot of threads for that subscriber that are stuck while doing a shutdown.
"MQTT Snd: vdm-dev-live" daemon prio=10 tid=0x00007f1b44781800 nid=0x3061 in Object.wait() [0x00007f1aa7bfa000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1258)
- locked <0x00000007ab13e218> (a java.lang.Thread)
at java.lang.Thread.join(Thread.java:1332)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.stop(CommsReceiver.java:77)
- locked <0x00000007ab552730> (a java.lang.Object)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.shutdownConnection(ClientComms.java:294)
at org.eclipse.paho.client.mqttv3.internal.CommsSender.handleRunException(CommsSender.java:154)
at org.eclipse.paho.client.mqttv3.internal.CommsSender.run(CommsSender.java:131)
at java.lang.Thread.run(Thread.java:722)
Any idea what is causing this and how this can be prevented ?
First of all share, please, the versions of Spring Integration and Paho Client.
According to the
after doing a couple of redeploys
I see this code fromCommsReceiver#.stop()
:Where
Thread.join()
:I'm really not sure what it means and how it should go further on that
wait
, but won't theredeploy
be a bottleneck to allow for thosedaemons
to continue to live, because the main Thread doesn't die?Following up from my comments in @Artem's answer...
There appears to be a deadlock in the Paho client. See line 573 of your Gist; the
Snd
thread is waiting for theRec
thread to terminate. At line 586, theRec
thread is blocked because the inbound queue is full (10). For all the cases that look like this, there is noCall
thread. So the queue full condition will never be cleared. Notice at line 227 the trifecta of threads are working fine (presumably a reconnection after redeploy?).With the dead threads, there is no
Call
thread.I think the problem is in the Paho client - in the
CommsCallback.run()
method, there is a catch onThrowable
, which closes the connection, but because the queue is full, theRec
thread is never notified (and so won't clean up). So it seems the message delivery is throwing an exception and, if the queue is full, causes this deadlock.The Paho client needs a fix but in the meantime, we can figure out what the exception is.
If the exception is downstream of the inbound gateway, you should see a log...
Since this log is produced in
MqttCallback.messageArrived()
, if you are not seeing such errors, the problem may be in the Paho client itself.The exception handling in
CommsCallback
looks like this...(that is where they should call
spaceAvailable.notifyAll()
to wake the (dying)Rec
thread).So, turning on FINE logging for the Paho client should tell you where/what the exception is.