Reconnect to ActiveMQ server after network failure

2019-05-11 11:33发布

问题:

we are using ActiveMQ 5.8.0 to connect our Java application via TCP to another system. Request/reply (synchronous with reply to temporary queue) works fine with our client and its corresponding part.

But we are not sure about how to handle "abnormal" situations like e.g. a short network failure. We are testing if the application can continue its work normally after socket reconnect.

But until now we couldn't manage that because the client seems not to automatically reconnect to the broker as expected. We thought about implenting it by ourselves inside an own TransportListener, but this is not recommended (see Transport Listener and ActiveMq restart, where ActiveMQ member Tim Bish hints to use the failover protocol). But the failover just manages to switch to another broker, if one is down, right?

Currently we are using the TransportListener only to monitor the connection state in the log file, which leads to some log entries like following or similar (posted in long log below).

ActiveMQ Connection Executor: ... Producer received: java.net.SocketException: ...

class ConnectionStateMonitor
    implements TransportListener
{
    @Override
    public void onCommand(Object command)
    {
        logInfo("Producer received: " + command);
    }

    @Override
    public void onException(IOException exception)
    {
        logError("Producer received: " + exception);
    }

    @Override
    public void transportInterupted()
    {
        logError("Producer received transport interuption.");
    }

    @Override
    public void transportResumed()
    {
        logInfo("Producer received transport resumption.");
    }
}

Sorry, for posting a long log excerpt with stacktraces below, but maybe someone immediately sees what is missing.

We are currently working with following settings:

  • wireFormat.maxInactivityDuration=20000
  • max wait time for a reply: 10000 ms

Any ideas how to solve that problem (and how to format pretty the log below) ?

Thanks in advance!

2013-06-05 14:09:21,676|main |Signal |DEBUG|Wait For 60000 2013-06-05 14:09:30,279|ActiveMQ InactivityMonitor WriteCheckTimer|AbstractInactivityMonitor|DEBUG|WriteChecker 6666 ms elapsed since last write check. 2013-06-05 14:09:30,282|ActiveMQ InactivityMonitor Worker|AbstractInactivityMonitor|DEBUG|Running WriteCheck[tcp://192.168.1.29:61616] 2013-06-05 14:09:36,945|ActiveMQ InactivityMonitor WriteCheckTimer|AbstractInactivityMonitor|DEBUG|WriteChecker 6666 ms elapsed since last write check. 2013-06-05 14:09:36,945|ActiveMQ InactivityMonitor Worker|AbstractInactivityMonitor|DEBUG|Running WriteCheck[tcp://192.168.1.29:61616] 2013-06-05 14:09:40,579|ActiveMQ Transport: tcp://test-server/192.168.1.29:61616@54127|ThreadPoolUtils|DEBUG|Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@13e0aba is shutdown: true and terminated: false took: 0.000 seconds. Caused by: javax.jms.JMSException: Connection reset at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49) at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1391) at org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1912) at org.apache.activemq.ActiveMQMessageProducer.(ActiveMQMessageProducer.java:125) at org.apache.activemq.ActiveMQSession.createProducer(ActiveMQSession.java:956) at de.wer.services.activemq.ActiveMqConnectionImpl.sendRequest(ActiveMqConnectionImpl.java:218) ... 4 more 2013-06-05 14:09:40,579|ActiveMQ Transport: tcp://test-server/192.168.1.29:61616@54127|ActiveMQConnection|DEBUG|Async exception with no exception listener: java.net.SocketException: Connection reset java.net.SocketException: Connection reset at java.net.SocketInputStream.read(Unknown Source) at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50) at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:604) at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58) at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:589) at java.io.DataInputStream.readInt(Unknown Source) at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:275) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:221) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:213) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196) at java.lang.Thread.run(Unknown Source) Caused by: java.net.SocketException: Connection reset at java.net.SocketInputStream.read(Unknown Source) at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50) at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:604) at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58) at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:589) at java.io.DataInputStream.readInt(Unknown Source) at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:275) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:221) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:213) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196) ... 1 more 2013-06-05 14:09:40,583|ActiveMQ Connection Executor: tcp://test-server/192.168.1.29:61616@54127|TcpTransport|DEBUG|Stopping transport tcp://test-server/192.168.1.29:61616@54127 2013-06-05 14:09:40,583|ActiveMQ Connection Executor: tcp://test-server/192.168.1.29:61616@54127|TaskRunnerFactory|DEBUG|Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@186f247 2013-06-05 14:09:40,584|ActiveMQ Task-1 |TcpTransport|DEBUG|Closed socket Socket[addr=test-server/192.168.1.29,port=61616,localport=54127] Caused by: javax.jms.JMSException: Connection reset at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49) at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1391) at org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1912) at org.apache.activemq.ActiveMQMessageProducer.<init>(ActiveMQMessageProducer.java:125) at org.apache.activemq.ActiveMQSession.createProducer(ActiveMQSession.java:956) at de.wer.services.activemq.ActiveMqConnectionImpl.sendRequest(ActiveMqConnectionImpl.java:218) ... 4 more Caused by: java.net.SocketException: Connection reset at java.net.SocketInputStream.read(Unknown Source) at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50) at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:604) at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58) at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:589) at java.io.DataInputStream.readInt(Unknown Source) at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:275) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:221) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:213) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196) ... 1 more 2013-06-05 14:09:40,584|ActiveMQ Connection Executor: tcp://test-server/192.168.1.29:61616@54127|ThreadPoolUtils|DEBUG|Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@186f247 Caused by: javax.jms.JMSException: Connection reset at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49) at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1391) at org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1912) at org.apache.activemq.ActiveMQMessageProducer.(ActiveMQMessageProducer.java:125) at org.apache.activemq.ActiveMQSession.createProducer(ActiveMQSession.java:956) at de.wer.services.activemq.ActiveMqConnectionImpl.sendRequest(ActiveMqConnectionImpl.java:218) ... 4 more Caused by: java.net.SocketException: Connection reset at java.net.SocketInputStream.read(Unknown Source) at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50) at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:604) at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58) at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:589) at java.io.DataInputStream.readInt(Unknown Source) at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:275) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:221) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:213) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196) ... 1 more 2013-06-05 14:09:40,587|ActiveMQ Connection Executor: tcp://test-server/192.168.1.29:61616@54127|ActiveMqConnectionImpl|ERROR|Producer received: java.net.SocketException: Connection reset

回答1:

It sounds like failover is definitely the thing that you want to use. You don't need to failover to another broker - the URI simply instructs the client library to reconnect, so you can do something like:

failover:(tcp://myBroker:61616)


标签: jms activemq