I have a SFTP directory and reading files and sending the files for further processing to a ServiceActivator.At any point I need to process them parallely using the handler.
Here is my SPring Integration java DSL flow.
IntegrationFlows.from(Sftp.inboundAdapter(getSftpSessionFactory())
.temporaryFileSuffix("COPY")
.localDirectory(directory)
.deleteRemoteFiles(false)
.preserveTimestamp(true)
.remoteDirectory("remoteDir"))
.patternFilter("*.txt")), e -> e.poller(Pollers.fixedDelay(500).maxMessagesPerPoll(5)))
.handle("mybean", "myMethod")
.handle(Files.outboundAdapter(new File("success")))
.deleteSourceFiles(true)
.autoCreateDirectory(true))
.get();
Update:Here is my ThreadPoolExecutor:
@Bean(name = "executor")
public Executor getExecutor()
{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(4);
executor.setQueueCapacity(20);
executor.initialize();
return executor;
}
The
Sftp.inboundAdapter()
(SftpInboundFileSynchronizingMessageSource
) returns remote files one by one anyway. First of all it synchronizes them to the local directory and only after that poll them for message processing asFile
payload.To process them in parallel that would be just enough to add a
taskExecutor
to youre.poller()
definition and all thosemaxMessagesPerPoll(5)
will be distributed to different threads.