Spring Integration: Application leaking SimpleAsyn

2019-08-03 23:19发布

问题:

We are having a spring application in production and it seems that it is leaking threads.

When I do a thread dump, I see the following threads that seems to be waiting. Here's an example below, but there are thousands of them

    "SimpleAsyncTaskExecutor-1801" prio=10 tid=0x00007fa668413800 nid=0x376c waiting on condition [0x00007fa4fbaff000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000007777e2ba8> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
        at org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel.receive(MessagingTemplate.java:415)
        at org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel.receive(MessagingTemplate.java:409)
        at org.springframework.integration.core.MessagingTemplate.doReceive(MessagingTemplate.java:317)
        at org.springframework.integration.core.MessagingTemplate.doSendAndReceive(MessagingTemplate.java:341)
        at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:255)
        at org.springframework.integration.core.MessagingTemplate.convertSendAndReceive(MessagingTemplate.java:290)
        at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:224)
        at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceive(MessagingGatewaySupport.java:203)
        at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:306)
        at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:269)
        at org.springframework.integration.gateway.GatewayProxyFactoryBean.access$200(GatewayProxyFactoryBean.java:71)
        at org.springframework.integration.gateway.GatewayProxyFactoryBean$AsyncInvocationTask.call(GatewayProxyFactoryBean.java:499)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.lang.Thread.run(Thread.java:745)

Eventually, it reaches a point where this causes the server to become unresponsive and only a restart will bring it back.

This seems to be involving a Spring Integration Gateway that sent a message and is waiting for some answer that it never gets. The only place in our application that uses this is a notification channel that sends (publishes) these messages to a RabbitMQ exchange, using a service interface

This is the service interface code:

@Component
public interface INotificationSender {

    Future<Void> sendNotification(@Payload Object notification,
            @Header("routingKey") String routingKey,
            @Header("notificationType") String type);
}

And this is the relevant spring configuration:

<!-- Spring Integration RabbitMQ adapter -->        
<rabbit:template
    id="amqpTemplate"
    connection-factory="notificationConnectionFactory" />

<rabbit:connection-factory
    id="notificationConnectionFactory"
    addresses="${notificationChannel.rabbitHost1}:${notificationChannel.rabbitPort}, ${notificationChannel.rabbitHost2}:${notificationChannel.rabbitPort}"
    username="${notificationChannel.rabbitUsername}"
    password="${notificationChannel.rabbitPassword}"
    virtual-host="${notificationChannel.rabbitVirtualHost}"/>

<rabbit:topic-exchange
    name="notificationExchange"/>


<!-- Spring Integration AMQP -->        
<int-amqp:outbound-channel-adapter 
    id="notificationChannelAdapter"
    channel="notificationChannelEnc"
    exchange-name="notificationExchange"
    routing-key-expression="headers['routingKey']"
    mapped-request-headers="STANDARD_REQUEST_HEADERS, notificationType"/>


<!-- Spring Integration Core -->
<int:channel
    id="notificationChannelEnc">
    <int:interceptors>
        <int:wire-tap channel="loggingChannel" />
    </int:interceptors>
</int:channel>

<int:channel id="notificationChannel"/>

<int:object-to-json-transformer
    id="NotificationEncoder"
    content-type="text/x-json"
    input-channel="notificationChannel"
    output-channel="notificationChannelEnc"/>       

<int:gateway
    id="notificationGateway"
    default-request-channel="notificationChannel"
    service-interface="com.ericsson.ericloud.commander.notification.sender.INotificationSender"/>

We are using Spring version 3.2.3.RELEASE and Spring Integration version 3.0.0.M2

Anybody has any idea how to handle this and have these threads terminate properly?

Thanks, /Sebastien

回答1:

Your Future<Void> is a bottleneck.

I can guess that you want to send notification asynchronously and don't wait any reply. But anyway Gateway try to wait reply because it sees Future as a hint to initiate an async process.

To overcome it you have to change the return to the simple void and use ExecutorChannel as a gateway's default-request-channel.

See more info in the Reference Manual.