How polling works for FTP inbound channel adapter

2019-02-20 21:10发布

问题:

I have UC where I need to pick the files from ftp location and place it into the server location I am using ftp-inbound-channel-adapter (Spring integration - 2.0.4) for achieving it . Below is the configuration in my xml

 <bean id="ftpAASessionFactory" class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
          <property name="host" value="${ftp.session.host}" />
          <property name="port" value="${ftp.session.port}" />
          <property name="username" value="${ftp.session.username}" />
          <property name="password" value="${ftp.session.password}" />
          <property name="clientMode" value="0" />
          <property name="fileType" value="0" />
   </bean>


   <ftp:inbound-channel-adapter id="ftpAAInbound"
          channel="ftpChannel" session-factory="ftpAASessionFactory" charset="UTF-8"
          auto-create-local-directory="false" delete-remote-files="true"
          remote-directory="${ftp.source.location}" local-directory="file://${ftp.target.location}"
          >
          <int:poller max-messages-per-poll="5" cron="0 */2 * ? * *">
          </int:poller>

   </ftp:inbound-channel-adapter>

   <int:channel id="ftpChannel">
          <int:queue />
          <int:interceptors>
                 <int:wire-tap channel="debugLogger" />
          </int:interceptors>
   </int:channel>

   <int:logging-channel-adapter id="debugLogger"
          level="DEBUG" log-full-message="true" />

   <int:logging-channel-adapter id="errorLogger"
          level="ERROR" log-full-message="true" />

I have configured max-messages-per-poll as 5 and polling to be done at every even minute (using cron expression).

My problem is that if we have 6 files in the ftp location , all the 6 files are transferred to the server location at first poll itself (according to max message per poll = 5 , it should only pick 5 files from Ftp location ) and the payload is formed for only 5 files.

I want only 5 files to be transferred to my server at first polling and on the second polling it should pick the last one

Please suggest the solution TIA

PFB the logs when there were 6 files in ftp location

    [CRA] [01/03/2017 12:38:00] DEBUG [task-scheduler-8] DefaultFtpSessionFactory.createClient(158) | Connected to server [prgrear01.group.root.ad:21]
***[CRA] [01/03/2017 12:38:00] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2015_02_01_07_50_01_102_20.txt***
[CRA] [01/03/2017 12:38:00] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2015_02_01_07_50_01_102_20.txt
[CRA] [01/03/2017 12:38:00] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:00] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_808_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_01.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_01.txt
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_21.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2015_02_01_07_50_01_102_21.txt
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_809_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FtpSession.read(79) | File has been successfully transfered from: /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_810_2017_02_22_07_50_01_102_02.txt
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FtpInboundFileSynchronizer.copyFileToLocalDirectory(219) | deleted /FTPManifestSrcDev/R2_2a/FTPManifestSrcDev/ABC_810_2017_02_22_07_50_01_102_02.txt
***[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] FileReadingMessageSource.scanInputDirectory(272) | Added to queue: [D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt, D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt, D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt, D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt, D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt, D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8]*** QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_808_2015_02_01_07_50_01_102_20.txt][Headers={timestamp=1488352081732, id=46536ab1-c0bd-4cf4-9867-b7d99e462ed5}]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_808_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081784, id=336045cf-0abd-4b1d-b698-d82c230e4b1f}]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_01.txt][Headers={timestamp=1488352081786, id=75029ba5-4857-4a4e-832f-b8c657b539e3}]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2015_02_01_07_50_01_102_21.txt][Headers={timestamp=1488352081789, id=edea505f-37a2-4c96-8034-b3c74f55f9de}]
[CRA] [01/03/2017 12:38:01] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:38:01] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_809_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352081792, id=5123c737-02d1-4846-9001-011796d92aa0}]
[CRA] [01/03/2017 12:40:00] INFO [task-scheduler-8] FileReadingMessageSource.receive(260) | Created message: [[Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] QueueChannel.preSend(224) | preSend on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] DirectChannel.preSend(224) | preSend on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] LoggingHandler.handleMessage(67) | org.springframework.integration.handler.LoggingHandler#0 received message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] LoggingHandler.handleMessageInternal(141) | [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] DirectChannel.postSend(237) | postSend (sent=true) on channel 'debugLogger', message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] QueueChannel.postSend(237) | postSend (sent=true) on channel 'ftpChannel', message: [Payload=D:\applications\files\local\ABC_810_2017_02_22_07_50_01_102_02.txt][Headers={timestamp=1488352200005, id=7a0a0ea6-e573-4981-9e2f-89ae0f646b50}]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] DefaultFtpSessionFactory.createClient(158) | Connected to server [prgrear01.group.root.ad:21]
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(91) | Poll resulted in Message: null
[CRA] [01/03/2017 12:40:00] DEBUG [task-scheduler-8] SourcePollingChannelAdapter.doPoll(101) | Received no Message during the poll, returning 'fal

.......

回答1:

That log only shows 4 files; it looks like it was configured for 3 messages per poll and it clearly shows 3 files were sent at 12:38 and 1 and 12:40 (with no fifth file found).

Poll resulted in Message: null

EDIT

Here is another way to achieve your desired result (using outbound gateways). This version uses the Java DSL.

If you prefer to use XML configuration, a similar flow is provided (in XML) in the ftp-sample - you would have to insert the file limiter between the ls gateway and splitter.

@SpringBootApplication
public class So42528316Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So42528316Application.class, args);
        try {
            context.getBean(So42528316Application.class).runDemo();
        }
        finally {
            context.close();
        }
    }

    @Autowired
    private FileGateway fetchAndProcess;

    private void runDemo() throws Exception {
        Collection<Boolean> rmResults = this.fetchAndProcess.processFilesAt("so42528316");
        while (rmResults != null) {
            System.out.println("Processed " + rmResults.size() + " files");
            Thread.sleep(10_000);
            rmResults = this.fetchAndProcess.processFilesAt("so42528316");
        }
        System.out.println("No more files available");
    }

    @MessagingGateway(defaultRequestChannel = "flow.input", defaultReplyTimeout = "0")
    public interface FileGateway {

        Collection<Boolean> processFilesAt(String pattern);

    }

    @Bean
    public DefaultFtpSessionFactory sessionFactory() {
        DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
        factory.setHost("10.0.0.3");
        factory.setUsername("ftptest");
        factory.setPassword("ftptest");
        factory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
        return factory;
    }

    @Bean
    public IntegrationFlow flow() {
        return f -> f
                .handle(Ftp.outboundGateway(sessionFactory(), "ls", "payload"))
                .handle("so42528316Application", "limitFiles")
                .split()
                .handle(Ftp.outboundGateway(sessionFactory(), "get",
                                "payload.remoteDirectory + '/' + payload.filename")
                        .localDirectory(new File("/tmp", "so42528316")))
                .handle("so42528316Application", "process")
                .handle(Ftp.outboundGateway(sessionFactory(), "rm",
                        "headers['file_remoteDirectory'] + '/' + headers['file_remoteFile']"))
                .aggregate();
    }

    public List<FtpFileInfo> limitFiles(List<FtpFileInfo> files) {
        // Add any logic you want here, e.g. check if file already on local disk.
        if (files.size() == 0) {
            return null;
        }
        else if (files.size() > 2) {
            System.out.println("Reducing fetch list from " + files.size() + " to 2");
            return files.stream().limit(2).collect(Collectors.toList());
        }
        else {
            return files;
        }
    }

    public String process(File file) {
        System.out.println("Processing " + file);
        file.delete();
        return file.getName();
    }

}

Result:

Reducing fetch list from 3 to 2
Processing /tmp/so42528316/bar.txt
Processing /tmp/so42528316/baz.txt
Processed 2 files
Processing /tmp/so42528316/foo.txt
Processed 1 files
No more files available