Good evening everybody
I run with SpringXD-1.3.0.RELEASE (configuration: 3 admin and 3 container) and RabbitMQ 3.5.7 (3 nodes) into following scenario:
stream create --name fire --definition "time --timeUnit=MILLISECONDS > topic:fire" --deploy
stream create --name fireeater_1 --definition "topic:fire > null"
stream create --name fireeater_2 --definition "topic:fire > null"
stream create --name fireeater_3 --definition "topic:fire > null"
stream create --name fireeater_4 --definition "topic:fire > null"
job create test123 --definition "timestampfile --directory=/tmp"
stream deploy fireeater_1 --properties "module.null.consumer.durableSubscription=true,module.null.count=0,module.null.consumer.prefetch=500"
stream deploy fireeater_2 --properties "module.null.consumer.durableSubscription=true,module.null.count=0,module.null.consumer.prefetch=500"
stream deploy fireeater_3 --properties "module.null.consumer.durableSubscription=true,module.null.count=0,module.null.consumer.prefetch=500"
stream deploy fireeater_4 --properties "module.null.consumer.durableSubscription=true,module.null.count=0,module.null.consumer.prefetch=500"
job deploy test123
After few minutes this exception is thrown on one container.
2016-01-13T18:08:47+0100 1.3.0.RELEASE WARN xdbus.job:test123-6 listener.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:721) ~[amqp-client-3.5.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:711) ~[amqp-client-3.5.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:569) ~[amqp-client-3.5.2.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 6
at com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:80) ~[amqp-client-3.5.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552) ~[amqp-client-3.5.2.jar:na]
... 1 common frames omitted
At the end the container processes the messages from rabbitmq very slowly. Only an undeployment of the "fire"-stream or a container restart will help.
It seems so that a message channel, used by the job is closed by the stream.
-- Update -- Today I saw more output from the log and it seems so that the channel is closed by the message bus itself. Here the log for the exception
2016-02-23T15:05:18+0100 1.3.0.RELEASE ERROR AMQP Connection 10.0.3.210:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-02-23T15:05:18+0100 1.3.0.RELEASE WARN SimpleAsyncTaskExecutor-2 listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:865) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:760) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:680) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:183) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1352) [spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:661) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1096) [spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1080) [spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$800(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1197) [spring-rabbit-1.5.2.RELEASE.jar!/:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@69022500]; nested exception is org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus$SendingHandler.handleMessageInternal(RabbitMessageBus.java:891) ~[na:na]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$400(AmqpInboundChannelAdapter.java:45) ~[spring-integration-amqp-4.2.2.RELEASE.jar!/:na]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$1.onMessage(AmqpInboundChannelAdapter.java:93) ~[spring-integration-amqp-4.2.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:757) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
... 10 common frames omitted
Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:67) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:51) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:833) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$400(CachingConnectionFactory.java:822) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:474) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:450) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:419) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:412) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1200(CachingConnectionFactory.java:86) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:838) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createChannel(ConnectionFactoryUtils.java:90) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:139) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:71) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1278) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1271) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:619) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:717) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:329) ~[spring-integration-amqp-4.2.2.RELEASE.jar!/:na]
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:321) ~[spring-integration-amqp-4.2.2.RELEASE.jar!/:na]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
... 25 common frames omitted
Caused by: java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:125) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:143) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:503) ~[amqp-client-3.5.2.jar!/:na]
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:44) ~[spring-rabbit-1.5.2.RELEASE.jar!/:na]
... 44 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ~[amqp-client-3.5.2.jar!/:na]
... 48 common frames omitted
Caused by: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 3289
at com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:82) ~[amqp-client-3.5.2.jar!/:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:556) ~[na:na]
... 1 common frames omitted
2016-02-23T15:05:18+0100 1.3.0.RELEASE ERROR AMQP Connection 10.0.3.210:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-02-23T15:05:18+0100 1.3.0.RELEASE WARN xdbus.test123-6 listener.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:739) ~[amqp-client-3.5.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:729) ~[amqp-client-3.5.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:573) ~[amqp-client-3.5.2.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 3289
at com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:82) ~[amqp-client-3.5.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:556) ~[amqp-client-3.5.2.jar:na]
... 1 common frames omitted
2016-02-23T15:05:18+0100 1.3.0.RELEASE WARN xdbus.fireeater_1-1 listener.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:739) ~[amqp-client-3.5.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:729) ~[amqp-client-3.5.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:573) ~[amqp-client-3.5.2.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 3289
at com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:82) ~[amqp-client-3.5.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:556) ~[amqp-client-3.5.2.jar:na]
... 1 common frames omitted
Does someone know why the exception is thrown and how is it possible to avoid this?
The problem seems to be a reply coming back on a channel that is already closed. When that occurs, the connection is torn down and all users will get that exception. As far as I can tell from the rabbit client code, that shouldn't happen.
Everything should recover though.
Spring AMQP has a channel cache to avoid closing channels and allows them to be reused; increasing the channel cache size (from the default 1), might solve the issue. You seem to be churning a lot of channels.
Spring XD currently doesn't expose the channel cache size as a property. To work around that, put the following file (named
rabbit-bus.xml
) underxd/config/META-INF/spring-xd/bus
.This replaces the standard bus configuration (it's a copy except for the connection factory configuration).
It will set the cache to 100 by default, but you can override it in
servers.yml
withEDIT
There is a better work-around, using the bus extension mechanism - simply put he replacement connection factory beans in the bus exension directory...
Any file name ending
.xml
will be found in that directory.