Why is there a delay in Spring AMQP Message dispat

2019-08-11 12:05发布

问题:

I am using Spring AMQP in my message driven application. I noticed that there is a nearly constant delay of around 300ms between invocations of my message listener, even though I am sure that the queue is filled with messages. The logfile below shows this delay between BlockingQueueConsumer.nextMessage and BlockingQueueConsumer.handle with a call to BlockingQueueConsumer.handleDelivery from another thread in between:

2015-05-12 12:46:18,655 TRACE [SimpleAsyncTaskExecutor-1] SimpleMessageListenerContainer.doReceiveAndExecute Waiting for message from consumer.
2015-05-12 12:46:18,655 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.nextMessage Retrieving delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:18,967 DEBUG [pool-1-thread-6 ] BlockingQueueConsumer.handleDelivery Storing delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:18,967 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.handle Received message: (Body:'[B@18dc305(byte[186])'MessageProperties [headers={..headers..}, timestamp=Tue May 12 01:16:06 CEST 2015, messageId=143134227498011576, userId=null, appId=SPT-T-2, clusterId=null, type=HBT, correlationId=null, replyTo=null, contentType=text, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=incoming, receivedRoutingKey=my-queue, deliveryTag=8, messageCount=0])
2015-05-12 12:46:18,967 INFO  [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Incoming
2015-05-12 12:46:18,967 INFO  [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Done
2015-05-12 12:46:18,967 TRACE [SimpleAsyncTaskExecutor-1] SimpleMessageListenerContainer.doReceiveAndExecute Waiting for message from consumer.
2015-05-12 12:46:18,967 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.nextMessage Retrieving delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,280 DEBUG [pool-1-thread-7 ] BlockingQueueConsumer.handleDelivery Storing delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,280 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.handle Received message: (Body:'[B@1aaa7d8(byte[186])'MessageProperties [headers={..headers..}, timestamp=Tue May 12 01:17:08 CEST 2015, messageId=143134227498011584, userId=null, appId=SPT-T-2, clusterId=null, type=HBT, correlationId=null, replyTo=null, contentType=text, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=incoming, receivedRoutingKey=my-queue, deliveryTag=9, messageCount=0])
2015-05-12 12:46:19,280 INFO  [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Incoming
2015-05-12 12:46:19,280 INFO  [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Done
2015-05-12 12:46:19,280 TRACE [SimpleAsyncTaskExecutor-1] SimpleMessageListenerContainer.doReceiveAndExecute Waiting for message from consumer.
2015-05-12 12:46:19,280 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.nextMessage Retrieving delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,577 DEBUG [pool-1-thread-3 ] BlockingQueueConsumer.handleDelivery Storing delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,577 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.handle Received message: (Body:'[B@1c893d2(byte[186])'MessageProperties [headers={..headers..}, timestamp=Tue May 12 01:18:07 CEST 2015, messageId=143134227498011592, userId=null, appId=SPT-T-2, clusterId=null, type=HBT, correlationId=null, replyTo=null, contentType=text, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=incoming, receivedRoutingKey=my-queue, deliveryTag=10, messageCount=0])
2015-05-12 12:46:19,577 INFO  [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Incoming
2015-05-12 12:46:19,577 INFO  [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Done

The logfile shows the message processing when the queue is definately full of messages. The relevant parts of my Spring configuration file looks like this:

<rabbit:connection-factory id="amqpConnectionFactory" connection-factory="clientConnectionFactory"
    host="${amqp.broker.ip}"
    port="${amqp.broker.port}"
    virtual-host="${amqp.broker.vhost}"
    username="${amqp.user}"
    password="${amqp.password}"/>

<bean id="clientConnectionFactory" class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
    <property name="useSSL" value="true" />
    <property name="sslPropertiesLocation" value="classpath:server.ini"/>
</bean>

<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <property name="connectionFactory" ref="amqpConnectionFactory" />
    <property name="messageConverter" ref="marshallingMessageConverter"/>
</bean>

<bean id="marshallingMessageConverter" class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
    <constructor-arg ref="jaxbMarshaller" />
</bean>

<oxm:jaxb2-marshaller id="jaxbMarshaller" context-path="com.my.package"/>

<rabbit:listener-container id="heartbeatListenerContainer" connection-factory="amqpConnectionFactory" auto-startup="false">
    <rabbit:listener ref="queueMessageHandler" queue-names="heartbeat-bdwh" />
</rabbit:listener-container>

<bean id="queueMessageHandler" class="com.my.package.QueueMessageHandler"/>

I am struggling to find the reason for this delay. As far as I understand it originates from the Spring BlockingQueueConsumer. I am not sure what is happening and why there is a call from another thread to the BlockingQueueConsumer.handleDelivery method.

Any help is greatly apprectiated!

回答1:

Maybe a network issue?

The default configuration handles 1 message at a time and the next is not sent by the broker until the ack is sent.

Try increasing the prefetch on the listener container so the container always has a message available when the consumer thread is ready.

Take a look at a network trace (Wireshark or similar).

EDIT:

If you have a poor network, and can live with the increased possibility of duplicate deliveries, you can also consider increasing the txSize so acks are not sent for every message. Be sure to set it to something less than prefetch, though.