Spring integration outbound channel adapters not c

2019-07-10 00:37发布

问题:

We are using spring integration adapters for file ftp in our project, the problem we are facing is, the adapters are not closing the open socket connections. As a result, other modules which are in the same managed server are failing with "Too many open files" socket connection exception. Is there a way to close the unused open socket connections from the channel adapters Or Can we get the underlying jsch connections and close the sockets from sftp channel adapters.

We have tried caching session factory and it did not close the open sockets. The file handles kept on piling up. Thanks in advance for the inputs.

We have two xmls one with outboundAdapter and the other with InboundAdapter. These two are in different xmls as they are different jobs that are run using spring batch. We are expected to send files to a location. 

We are using spring batch 2.2.0 and spring integration 2.1.6 and spring integration 2.1.6.

Here is the configuration: We have one session factory and it is wrapped by cachingSession factory:

<beans:bean id="sftpSessionFactory"
  class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    <beans:property name="host" value="hostname"/>
    <beans:property name="privateKey" value="somepath"/>
    <beans:property name="port" value="22"/>
</beans:bean>

<bean id="cachingSessionFactory"
  class="org.springframework.integration.file.remote.session.CachingSessionFactory">
    <constructor-arg ref="sftpSessionFactory"/>
    <constructor-arg value="10"/>
    <property name="sessionWaitTimeout" value="1000"/>
</bean>
**and then we have a channel**
<int:channel id="ftpChannel" />

**and then we have the following outbound Channel adapter**
<int-sftp:outbound-channel-adapter id="sftpOutboundAdapter"
    session-factory="cachingSessionFactory"
    channel="inputChannel"
    charset="UTF-8"
    use-temporary-filename="false"/>

**With the above configuration we are using the ftpChannel to send the files by constructing a payload like this:**
message = MessageBuilder.withPayLoad(f).build() // MessageBuilder is //org.springframework.integration.support.MessageBuilder and f is the file

ftpChannel.send(message)

**In another inbound job, the following is the configuration of adapters:
Session factory:**



<beans:bean id="sftpSessionFactory2"
  class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    <beans:property name="host" value="hostname"/>
    <beans:property name="privateKey" value="somepath"/>
    <beans:property name="port" value="22"/>
</beans:bean>
**Caching session factory:**
<bean id="cachingSessionFactory2"
class="org.springframework.integration.file.remote.session.CachingSessionFactory">
    <constructor-arg ref="sftpSessionFactory2"/>
    <constructor-arg value="10"/>
    <property name="sessionWaitTimeout" value="1000"/>
</bean>
 **and another channel:**
<int:channel id="ftpChannel2" />
 **Now we have the following adapter in this xml:**
<int-sftp:outbound-channel-adapter id="sftpInboundAdapter"
    session-factory="cachingSessionFactory2"
    channel="inputChannel"
    charset="UTF-8"
    use-temporary-filename="false"/>

With this configuration in the above xml we are trying to get session from the cachingSessionFactory configured in the first xml, getting a session out of it, getting a list of files and then sending some files with ftpChannel2.send() and doing session.close() in finally block. When I do session.isOpen() in after session.close(), I see true being returned.

With these two jobs, I could see a lot of open file handles, which are socket connections and I am absolutely clueless as to how I can close those opened sockets.

回答1:

The session will be closed when the operation is complete as long as you don't use the caching session factory - that is intended to keep the session open for the next use.

If you turn on DEBUG logging, you should get some insight into what it wrong.

EDIT

Just ran this with no problems:

@Test
public void test() throws Exception {
    DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
    sf.setHost("10.0.0.3");
    sf.setUsername("ftptest");
    sf.setPassword("ftptest");
    FtpSession session = sf.getSession();
    Thread.sleep(10000);
    session.close();
    assertFalse(session.isOpen());
    System.out.println("closed");
    Thread.sleep(10000);
}

During the first sleep netstat -ntp shows the socket open; socket is gone after the close.

The session is the socket...

public void disconnect() throws IOException
{
    closeQuietly(_socket_);
    ...
}

EDIT2

I had forgotten that with 2.1.x there was the cache-sessions attribute (2.1.x is very old).

I just tested with this (and 2.1.6) ...

<bean id="sftpSessionFactory"
    class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    <property name="host" value="10.0.0.3" />
    <property name="privateKey" value="file:/somPathTo/.ssh/id_rsa" />
    <property name="port" value="22" />
    <property name="user" value="ftptest" />
</bean>

<int:channel id="inputChannel" />

<int-sftp:outbound-channel-adapter id="sftpOutboundAdapter"
    session-factory="sftpSessionFactory"
    channel="inputChannel"
    charset="UTF-8"
    cache-sessions="false"
    use-temporary-file-name="false"
    remote-directory="." />

public class Main {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = new ClassPathXmlApplicationContext("context.xml");
        File f = new File("foo.txt");
        FileOutputStream fos = new FileOutputStream(f);
        fos.write("bar".getBytes());
        fos.close();
        context.getBean("inputChannel", MessageChannel.class).send(MessageBuilder.withPayload(f).build());
        System.out.println("Sleeping - check socket");
        Thread.sleep(60000); // check socket
        context.close();
        System.exit(0);
    }

}

With no problems (the socket is closed); if I set the cache-sessions to true, the socket remains open as expected.

I do notice you don't have a remote-directory attribute - that's illegal:

exactly one of 'remote-directory' or 'remote-directory-expression' is required on a remote file outbound adapter