I am newbie to Spring Integration. I am working on solution, but I am stuck on a specific issue while using inbound file adapter ( FileReadingMessageSource ). I have to read files from different directories and process them and save the files in different directories. As I understand, the directory name is fixed at the start of the flow. Can some one help me on changing the directory name for different requests.
I attempted the following. First of all, I am not sure whether it is correct way to about and although it worked for only one directory. I think Poller was waiting for more files and never came back to read another directory.
@SpringBootApplication
@EnableIntegration
@IntegrationComponentScan
public class SiSampleFileProcessor {
@Autowired
MyFileProcessor myFileProcessor;
@Value("${si.outdir}")
String outDir;
@Autowired
Environment env;
public static void main(String[] args) throws IOException {
ConfigurableApplicationContext ctx = new SpringApplication(SiSampleFileProcessor.class).run(args);
FileProcessingService gateway = ctx.getBean(FileProcessingService.class);
boolean process = true;
while (process) {
System.out.println("Please enter the input Directory: ");
String inDir = new Scanner(System.in).nextLine();
if ( inDir.isEmpty() || inDir.equals("exit") ) {
process=false;
} else {
System.out.println("Processing... " + inDir);
gateway.processFilesin(inDir);
}
}
ctx.close();
}
@MessagingGateway(defaultRequestChannel="requestChannel")
public interface FileProcessingService {
String processFilesin( String inputDir );
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(1000).get();
}
@Bean
public MessageChannel requestChannel() {
return new DirectChannel();
}
@ServiceActivator(inputChannel = "requestChannel")
@Bean
GenericHandler<String> fileReader() {
return new GenericHandler<String>() {
@Override
public Object handle(String p, Map<String, Object> map) {
FileReadingMessageSource fileSource = new FileReadingMessageSource();
fileSource.setDirectory(new File(p));
Message<File> msg;
while( (msg = fileSource.receive()) != null ) {
fileInChannel().send(msg);
}
return null; // Not sure what to return!
}
};
}
@Bean
public MessageChannel fileInChannel() {
return MessageChannels.queue("fileIn").get();
}
@Bean
public IntegrationFlow fileProcessingFlow() {
return IntegrationFlows.from(fileInChannel())
.handle(myFileProcessor)
.handle(Files.outboundAdapter(new File(outDir)).autoCreateDirectory(true).get())
.get();
}
}
EDIT: Based on Gary's response replaced some methods as
@MessagingGateway(defaultRequestChannel="requestChannel")
public interface FileProcessingService {
boolean processFilesin( String inputDir );
}
@ServiceActivator(inputChannel = "requestChannel")
public boolean fileReader(String inDir) {
FileReadingMessageSource fileSource = new FileReadingMessageSource();
fileSource.setDirectory(new File(inDir));
fileSource.afterPropertiesSet();
fileSource.start();
Message<File> msg;
while ((msg = fileSource.receive()) != null) {
fileInChannel().send(msg);
}
fileSource.stop();
System.out.println("Sent all files in directory: " + inDir);
return true;
}
Now it is working as expected.
You can use this code
FileProcessor.java
LastModifiedFileFilter.java
Main Class= DemoApplication.java
The
FileReadingMessageSource
uses aDirectoryScanner
internally; it is normally set up by Spring after the properties are injected. Since you are managing the object outside of Spring, you need to call Spring bean initialization and lifecycle methodsafterPropertiesSet()
,start()
andstop()
.Call
stop()
when the receive returns null.If you return nothing, your calling thread will hang in the gateway waiting for a response. You could change the gateway to return
void
or, since your gateway is expecting a String, just return some value.However, your calling code is not looking at the result anyway.
Also, remove the
@Bean
from the@ServiceActivator
; with that style, the bean type must beMessageHandler
.