Notifying channel shutdown to classes implementing

2019-05-31 12:05发布

问题:

  • We use the ConnectionListener interface provided by Spring-AMQP to keep a tab on the underlying connection. The channel is created with autorecovery with a heartbeat set to 10 mins (for some product needs). My observation is the connectionListener.onClose() method does not get called for almost 10 mins even after the underlying rabbitMQ has died.

  • We also do a health status check on the API and use the connectionListener.isOpen() method to determine the state of the connection.

  • And because of this code block in the SimpleConnection class

    @Override
    public boolean isOpen() {
        return delegate != null
                && (delegate.isOpen() || this.delegate.getClass().getSimpleName().contains("AutorecoveringConnection"));
    }
    

always returns true as the connection is auto recovering. Because of which the Health API does not learn of the connection failure for 10 mins after the connection has died.

  • Is there any recommended way to notify the ConnectionListener of the channel closures as the isOpen method is clearly not serving the need?

  • Is implementing the ShutDownlistener the way to go? Since we do not have access to the channel in the connectionListerner and cannot directly do connection.addConnectionListerner (this). From the connectionFactory shutdownCompleted method is it possible to invoke the onClose or any other method on the connectionListener to notify it of closure?

Any other thoughts?

回答1:

First of all, autoRecovery is not necessary with Spring AMQP; it has always had its own recovery mechanism, which predates (by a very long time) the mechanism now provided by the client library.

It is effectively disabled on the consumer side anyway.

The reason being that when the broker recovers the channel, the code that was listening on that channel is long gone and the consumer is orphaned. To avoid this issue, we close the channel when the exception is detected to prevent autorecovery from recovering the channel.

So, the simple answer is to disable auto recovery in the underlying connection.

When using the SimpleMessageListenerContainer, it will continue to attempt reconnection indefinitely, based on its recoveryInterval or recoveryBackOff.

You can still use heartbeats.

In addition, the container publishes application events when a consumer encounters an exception; you can use an ApplicationListener to be notified of those events; see the documentation for more information.



回答2:

  • As suggested ,removing the native client's autorecovery mechanism did the trick for us.
  • Now the Spring AMQP auto recovery kicks in case of connection resets.
  • And the isOpen() method of the ConnectionListener returns the correct state of the connection each time so the health status API is also fixed.