Commons Net FTPClient hangs indefinitely with Mule

2019-05-26 13:28发布

I encountered an issue with the Mule ESB FTP Transport: when polling, the thread running the client would hang indefinitely without throwing an error. This causes FTP poll to stop completely. Mule uses Apache Commons Net FTPClient.

Looking further into the code, I think it is caused by the SocketTimeout of the FTPClient not being set, sometime causing infinite hanging when reading lines from the FTPClient's socket.

We can clearly see the problem in these stacks retrieved with jstack when the problem occured. The __getReply() function seems to be the more direct link to the problem.

This one hanging on connect() call when creating a new FTPClient:

receiver.172 prio=10 tid=0x00007f23e43c8800 nid=0x2d5 runnable [0x00007f24c32f1000]
   java.lang.Thread.State: RUNNABLE
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.read(SocketInputStream.java:152)
    at java.net.SocketInputStream.read(SocketInputStream.java:122)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
    - locked <0x00000007817a9578> (a java.io.InputStreamReader)
    at java.io.InputStreamReader.read(InputStreamReader.java:184)
    at java.io.BufferedReader.fill(BufferedReader.java:154)
    at java.io.BufferedReader.readLine(BufferedReader.java:317)
    - locked <0x00000007817a9578> (a java.io.InputStreamReader)
    at java.io.BufferedReader.readLine(BufferedReader.java:382)
    at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:294)
    at org.apache.commons.net.ftp.FTP._connectAction_(FTP.java:364)
    at org.apache.commons.net.ftp.FTPClient._connectAction_(FTPClient.java:540)
    at org.apache.commons.net.SocketClient.connect(SocketClient.java:178)
    at org.mule.transport.ftp.FtpConnectionFactory.makeObject(FtpConnectionFactory.java:33)
    at org.apache.commons.pool.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:1188)
    at org.mule.transport.ftp.FtpConnector.getFtp(FtpConnector.java:172)
    at org.mule.transport.ftp.FtpConnector.createFtpClient(FtpConnector.java:637)
    at org.mule.transport.ftp.FtpMessageReceiver.listFiles(FtpMessageReceiver.java:134)
    at org.mule.transport.ftp.FtpMessageReceiver.poll(FtpMessageReceiver.java:94)
    at org.mule.transport.AbstractPollingMessageReceiver.performPoll(AbstractPollingMessageReceiver.java:216)
    at org.mule.transport.PollingReceiverWorker.poll(PollingReceiverWorker.java:80)
    at org.mule.transport.PollingReceiverWorker.run(PollingReceiverWorker.java:49)
    at org.mule.transport.TrackingWorkManager$TrackeableWork.run(TrackingWorkManager.java:267)
    at org.mule.work.WorkerContext.run(WorkerContext.java:286)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
    - <0x00000007817a3540> (a java.util.concurrent.ThreadPoolExecutor$Worker)

And the other hanging on pasv() call when using listFiles():

receiver.137" prio=10 tid=0x00007f23e433b000 nid=0x7c06 runnable [0x00007f24c2fee000]
   java.lang.Thread.State: RUNNABLE
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.read(SocketInputStream.java:152)
    at java.net.SocketInputStream.read(SocketInputStream.java:122)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
    - locked <0x0000000788847ed0> (a java.io.InputStreamReader)
    at java.io.InputStreamReader.read(InputStreamReader.java:184)
    at java.io.BufferedReader.fill(BufferedReader.java:154)
    at java.io.BufferedReader.readLine(BufferedReader.java:317)
    - locked <0x0000000788847ed0> (a java.io.InputStreamReader)
    at java.io.BufferedReader.readLine(BufferedReader.java:382)
    at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:294)
    at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:490)
    at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:534)
    at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:583)
    at org.apache.commons.net.ftp.FTP.pasv(FTP.java:882)
    at org.apache.commons.net.ftp.FTPClient._openDataConnection_(FTPClient.java:497)
    at org.apache.commons.net.ftp.FTPClient.initiateListParsing(FTPClient.java:2296)
    at org.apache.commons.net.ftp.FTPClient.initiateListParsing(FTPClient.java:2269)
    at org.apache.commons.net.ftp.FTPClient.initiateListParsing(FTPClient.java:2189)
    at org.apache.commons.net.ftp.FTPClient.initiateListParsing(FTPClient.java:2132)
    at org.mule.transport.ftp.FtpMessageReceiver.listFiles(FtpMessageReceiver.java:135)
    at org.mule.transport.ftp.FtpMessageReceiver.poll(FtpMessageReceiver.java:94)
    at org.mule.transport.AbstractPollingMessageReceiver.performPoll(AbstractPollingMessageReceiver.java:216)
    at org.mule.transport.PollingReceiverWorker.poll(PollingReceiverWorker.java:80)
    at org.mule.transport.PollingReceiverWorker.run(PollingReceiverWorker.java:49)
    at org.mule.transport.TrackingWorkManager$TrackeableWork.run(TrackingWorkManager.java:267)
    at org.mule.work.WorkerContext.run(WorkerContext.java:286)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
    - <0x0000000788832180> (a java.util.concurrent.ThreadPoolExecutor$Worker)

I think the problem is caused by the use of the default FTPClient constructor (extending SocketClient) in Mule default FtpConnectionFactory.

Note the setConnectTimeout() values seems to be used only when calling socket.connect(), but ignored on other operations using the same socket:

protected FTPClient createFtpClient()
    {
        FTPClient ftpClient = new FTPClient();
        ftpClient.setConnectTimeout(connectionTimeout);

        return ftpClient;
    }

It uses the FTPClient() constructor, itself using SocketClient with a 0 timeout, defined when creating the socket.

public  SocketClient()
    {
        ...
        _timeout_ = 0;
        ...
    }

And then we call connec(), which calls _ connectAction()_.

In SocketClient:

protected void  _connectAction_() throws IOException
    {
        ...
        _socket_.setSoTimeout(_timeout_);
        ...
    }

In FTP, a new Reader is instanciated with our everlasting socket:

protected _connectAction_(){

    ...
_controlInput_ =
         new BufferedReader(new InputStreamReader(_socket_.getInputStream(),
                                                  getControlEncoding()));
    ...
}

Then, when calling __getReply() function, we use this Reader-with-everlasting-socket:

private void  __getReply() throws IOException
     {
        ...
         String line = _controlInput_.readLine();
        ...
    }

Sorry for the long post, but I think this required correct explanations. A solution may be to call setSoTimeout() just after connect(), to define a Socket Timeout.

Having a default timeout does not seem an acceptable solution, as each users may have different needs and a default is not suitable in any case. https://issues.apache.org/jira/browse/NET-35

Finally, this raises 2 questions:

  1. It seems like a bug to me, as it will completely stops FTP polling without giving error. What do you think?
  2. What could be an easy way to avoid such situation? Calling setSoTimeout() with a custom FtpConnectionFactory? Am I missing a configuration or parameter somewhere?

Thanks by advance.

EDIT: I am using Mule CE Standalone 3.5.0, which seems to use Apache Commons Net 2.0. But looking in the code, Mule CE Standalone 3.7 with Commons Net 2.2 does not seem different. Here are the source codes involved:

https://github.com/mulesoft/mule/blob/mule-3.5.x/transports/ftp/src/main/java/org/mule/transport/ftp/FtpConnectionFactory.java

http://grepcode.com/file/repo1.maven.org/maven2/commons-net/commons-net/2.0/org/apache/commons/net/SocketClient.java

http://grepcode.com/file/repo1.maven.org/maven2/commons-net/commons-net/2.0/org/apache/commons/net/ftp/FTP.java

http://grepcode.com/file/repo1.maven.org/maven2/commons-net/commons-net/2.0/org/apache/commons/net/ftp/FTPClient.java

2条回答
狗以群分
2楼-- · 2019-05-26 14:10

I reproduced the error in both my previous cases using MockFtpServer, and I was able to use a FtpConnectionFactory which seems to solve the issue.

public class SafeFtpConnectionFactory extends FtpConnectionFactory{

    //define a default timeout
    public static int defaultTimeout = 60000;
    public static synchronized int getDefaultTimeout() {
        return defaultTimeout;
    }
    public static synchronized void setDefaultTimeout(int defaultTimeout) {
        SafeFtpConnectionFactory.defaultTimeout = defaultTimeout;
    }

    public SafeFtpConnectionFactory(EndpointURI uri) {
        super(uri);
    }

    @Override
    protected FTPClient createFtpClient() {
        FTPClient client = super.createFtpClient();

        //Define the default timeout here, which will be used by the socket by default,
        //instead of the 0 timeout hanging indefinitely
        client.setDefaultTimeout(getDefaultTimeout());

        return client;
    }
}

And then attaching it to my connector:

<ftp:connector name="archivingFtpConnector" doc:name="FTP"
        pollingFrequency="${frequency}"
        validateConnections="true"
        connectionFactoryClass="my.comp.SafeFtpConnectionFactory">
    <reconnect frequency="${reconnection.frequency}" count="${reconnection.attempt}"/>
</ftp:connector>

Using this configuration, a java.net.SocketTimeoutException will be thrown after the specified timeout, such as:

java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.read(SocketInputStream.java:152)
    at java.net.SocketInputStream.read(SocketInputStream.java:122)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
    at java.io.InputStreamReader.read(InputStreamReader.java:184)
    at java.io.BufferedReader.fill(BufferedReader.java:154)
    at java.io.BufferedReader.readLine(BufferedReader.java:317)
    at java.io.BufferedReader.readLine(BufferedReader.java:382)
    at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:294)
    at org.apache.commons.net.ftp.FTP._connectAction_(FTP.java:364)
    at org.apache.commons.net.ftp.FTPClient._connectAction_(FTPClient.java:540)
    at org.apache.commons.net.SocketClient.connect(SocketClient.java:178)
    at org.mule.transport.ftp.FtpConnectionFactory.makeObject(FtpConnectionFactory.java:33)
    at org.apache.commons.pool.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:1188)
    at org.mule.transport.ftp.FtpConnector.getFtp(FtpConnector.java:172)
    at org.mule.transport.ftp.FtpConnector.createFtpClient(FtpConnector.java:637)
    ...

Otherwise, an attempt at connect() or pasv() would hang indefinitely without server response. I reproduced this exact behavior using mock FTP.

Note: I used setDefaultTimeout() as it seems to be the variable used with connect() and connectAction() (from SocketClient source):

public abstract class SocketClient
{
    ...
    protected void _connectAction_() throws IOException
    {
        ...
        _socket_.setSoTimeout(_timeout_);
        ...
    }
    ...
    public void  setDefaultTimeout(int timeout)
    {
        _timeout_ = timeout;
    }
    ...
}

EDIT: For those who are interested, here is the test code for mock FTP used to reproduce the never answering server. The infinite loop is far from good practice though. It should be replaced with something like sleep with an enclosing Test class expecting a SocketTimeout exception and ensuring failure after a given timeout.

    private static final int CONTROL_PORT = 2121;

    public void startStubFtpServer(){
        FakeFtpServer fakeFtpServer = new FakeFtpServer();

        //define the command which should never be answered
        fakeFtpServer.setCommandHandler(CommandNames.PASV, new EverlastingCommandHandler());
        //fakeFtpServer.setCommandHandler(CommandNames.CONNECT, new EverlastingConnectCommandHandler());
        //or any other command...

        //server config
        ...

        //start server
        fakeFtpServer.setServerControlPort(CONTROL_PORT);
        fakeFtpServer.start();

        ...

    }

    //will cause any command received to never have an answer
    public class EverlastingConnectCommandHandler extends org.mockftpserver.core.command.AbstractStaticReplyCommandHandler{
        @Override
        protected void handleCommand(Command cmd, Session session, InvocationRecord rec) throws Exception {
            while(true){
                try {
                    Thread.sleep(60000);
                } catch (InterruptedException e) {
                    //TODO
                }
            }
        }

    }
    public class EverlastingCommandHandler extends AbstractFakeCommandHandler {
        @Override
        protected void handle(Command cmd, Session session) {
            while(true){
                try {
                    Thread.sleep(60000);
                } catch (InterruptedException e) {
                    //TODO
                }
            }
        }
    };
查看更多
Evening l夕情丶
3楼-- · 2019-05-26 14:20

In an ideal world the timeout should not be necessary, but it looks like in your case it is.

Your description is very comprehensive, have you considered to raise a bug?

To workaround I would suggest first to use "Response Timeout" in the advanced tab. If that doesnt work I would use a service override, from there you should be able to override the receiver.

查看更多
登录 后发表回答