-->

Spring Integration - Reliable TCP for high volume

2020-07-27 09:38发布

问题:

I'm using Spring Integration for TCP server which keeps connections to a few thousand clients. I need the server to throttle clients in case of excessive load and not to lose messages.

My server configuration:

<task:executor id="myTaskExecutor"
    pool-size="4-8"
    queue-capacity="0"
    rejection-policy="CALLER_RUNS" />

<int-ip:tcp-connection-factory id="serverTcpConFact"
    type="server"
    port="60000"
    using-nio="true"
    single-use="false"
    so-timeout="300000"
    task-executor="myTaskExecutor" />

<int-ip:tcp-inbound-channel-adapter id="tcpInboundAdapter"
    channel="tcpInbound"
    connection-factory="serverTcpConFact" />

<channel id="tcpInbound" />

<service-activator input-channel="tcpInbound"
    ref="myService"
    method="test" />

<beans:bean id="myService" class="org.test.tcpserver.MyService" />

Since the default task executor for the connection factory is unbounded, I use a pooled task executor to prevent out of memory errors.

A simple client for load testing:

public class TCPClientTest {
    static Socket socket;
    static List<Socket> sl = new ArrayList<>();
    static DataOutputStream out;

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 10000; i++) {
            socket = new Socket("localhost", 60000);
            sl.add(socket);
            out = new DataOutputStream(socket.getOutputStream());
            out.writeBytes("connection " + i + "\r\n");
            System.out.println("Using connection #" + i);
        }
        System.in.read();
    }
}

When I run it, the server only receives about 10-20 messages and then the client gets the "Connection refused: connect" exception. After that the server can't accept any new connections anymore, even after the connection timeout. Increasing the pool size only helps to get a little bit more messages.

EDIT

I'm using Spring Integration 3.0.2.RELEASE. For production I'm using 8-40 threads, but it only makes this test to fail later, after several hundred connections.

MyService.test() doesn't do much...

public class MyService {
    public void test(byte[] input) {
        System.out.println("Received: " + new String(input));
    }
}

Here is the log with trace level logging.

Sources

回答1:

I see what the problem is, please open a JIRA issue.

The issue is the CALLER_RUNS rejection policy with a 0 length queue in the executor.

There is one thread that handles all IO events (usually myTaskExecutor-1); when a read event fires he queues an execution to read the data; the reader thread queues an execution to assemble the data (which will block until a complete message - in your case terminated by the CRLF - arrives).

In this case, when there are no threads available, the CALLER_RUNS policy means the IO selector thread does the read, and becomes the assembler thread, which blocks waiting for data that won't arrive because he is blocked and would later have read the data after scheduling a different thread to block. Because he is blocked, he can't handle new accept events.

Here is a log from my test showing the issue...

TRACE: [May-18 10:43:38,923][myTaskExecutor-1] tcp.connection.TcpNioServerConnectionFactory - Port 60000 SelectionCount: 2
DEBUG: [May-18 10:43:38,923][myTaskExecutor-1] tcp.connection.TcpNioConnection - localhost:58509:60000:bdc36c59-c31b-470e-96c3-6270e7c46a2f Reading...
DEBUG: [May-18 10:43:38,924][myTaskExecutor-1] tcp.connection.TcpNioConnection - localhost:58509:60000:bdc36c59-c31b-470e-96c3-6270e7c46a2f Running an assembler
TRACE: [May-18 10:43:38,924][myTaskExecutor-1] tcp.connection.TcpNioConnection - localhost:58509:60000:bdc36c59-c31b-470e-96c3-6270e7c46a2f Nio message assembler running...
DEBUG: [May-18 10:43:38,926][myTaskExecutor-1] tcp.serializer.ByteArrayCrLfSerializer - Available to read:0

The second line shows the selector thread being used to do the read; he detects that an assembler is needed for this socket, and becomes the assembler, blocking, waiting for data.

Do you really believe there will be an issue using an unbounded task executor? These events are generally pretty short lived so threads will be recycled pretty quickly.

Increasing the executor's queue capacity above 0 should help too, but it won't completely assure the problem won't happen (although a large queue size is unlikely to be hit).

I am yet not sure how to fix this, aside from using a dedicated task executor for the IO selector and reader threads so they will never be used as an assembler.



回答2:

yesterday I did write a sample just to create tcp high performance server code using spring integration. I tested it successfully with 1000 concurrent client requests using JMeter TCP sampler.

Here is the code - https://github.com/rajeshgheware/spring-integration-samples including JMeter test config file.

I successfully tested with 1000 concurrent tcp client requests on 64bit Laptop having Intel core i5 M520 2.4GHz (both the server code and JMeter test running on this machine)

I also tried with 1500 concurrent client requests but observed that server could not honor many requests. I will keep trying to enhance this code to serve 10000 concurrent client requests (I know I may need to get good EC2 machine from Amazon for this test :) )