I'm extremely new to Spring, and even more-so to Spring Integration, so apologies if this is a very basic question.
I'm wanting to build a very basic log file processor to learn the ropes. Very similar to this: example
I'm also wanting to use a java config approach, and most of the examples I've been following are all XML driven and I'm having a hard time doing the translation.
Ultimately I'd like to recursively poll a source directory for log files and use a persistence store to keep track of what's been found.
Then, copy those files to be processed to a processing folder, and then kick off a spring batch job to process the contents of the file.
When everything completes the processed file can be deleted from the processing location.
I can't seem to figure out the proper way to wire up (using general java config of SpEL) the flow. Also, I'm still very unsure of what the proper pieces should be.
Again something along these basic, high-level lines for the file moving:
file:inbound-channel-adapter -> channel -> file:outbound-adapter
Here is what I have so far
EDIT
I've updated with Artem's solution. My source files are now properly copied to the destination location. Thanks Artem!
Ultimately I am still facing the same problem. The files to be scanned are found immediately (and the metadata-store.properties files is populated immediately) but the files are slowly copied to the destination folder. If a crash happens, any source files that have not been copied to the destination folder will essentially be "lost". Perhaps I need to look at other forms of persistence stores, like a custom jdbcfilter.
@Value("${logProcessor.filenamePattern}")
private String filenamePattern;
@Value("${logProcessor.sourceDirectory}")
private String sourceDirectory;
@Value("${logProcessor.processingDirectory}")
private String processingDirectory;
@Bean
@InboundChannelAdapter(channel = "sourceFileChannel", poller = @Poller(fixedRate = "5000"))
public MessageSource<File> sourceFiles() {
CompositeFileListFilter<File> filters = new CompositeFileListFilter<>();
filters.addFilter(new SimplePatternFileListFilter(filenamePattern));
filters.addFilter(persistentFilter());
FileReadingMessageSource source = new FileReadingMessageSource();
source.setAutoCreateDirectory(true);
source.setDirectory(new File(sourceDirectory));
source.setFilter(filters);
source.setUseWatchService(true);
return source;
}
@Bean
@InboundChannelAdapter(channel = "processingFileChannel", poller = @Poller(fixedRate = "5000"))
public MessageSource<File> processingFiles() {
CompositeFileListFilter<File> filters = new CompositeFileListFilter<>();
filters.addFilter(new SimplePatternFileListFilter(filenamePattern));
FileReadingMessageSource source = new FileReadingMessageSource();
source.setAutoCreateDirectory(true);
source.setDirectory(new File(processingDirectory));
source.setFilter(filters);
return source;
}
@Bean
@ServiceActivator(inputChannel = "sourceFileChannel")
public MessageHandler fileOutboundChannelAdapter() {
FileWritingMessageHandler adapter = new FileWritingMessageHandler(new File(processingDirectory));
adapter.setDeleteSourceFiles(false);
adapter.setAutoCreateDirectory(true);
adapter.setExpectReply(false);
return adapter;
}
@Bean
public MessageChannel sourceFileChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel processingFileChannel() {
return new DirectChannel();
}
@Bean
public DefaultDirectoryScanner defaultDirectoryScanner() {
return new DefaultDirectoryScanner();
}
@Bean
public FileSystemPersistentAcceptOnceFileListFilter persistentFilter() {
FileSystemPersistentAcceptOnceFileListFilter fileSystemPersistentAcceptOnceFileListFilter = new FileSystemPersistentAcceptOnceFileListFilter(metadataStore(), "");
fileSystemPersistentAcceptOnceFileListFilter.setFlushOnUpdate(true);
return fileSystemPersistentAcceptOnceFileListFilter;
}
@Bean
public PropertiesPersistingMetadataStore metadataStore(){
PropertiesPersistingMetadataStore metadataStore = new PropertiesPersistingMetadataStore();
metadataStore.setBaseDirectory("C:\\root\\code\\logProcessor");
return metadataStore;
}
You config is good so far.
Having such a complex task, I'm not sure how to help you.
You should ask more specific question. We can't write the solution for you.
Not sure why you need to copy files from one dir to another, if you can simply poll them from the source dir, store in the
metadataStore
and start a file processing.So, far I see a small problem in your config. The
FileWritingMessageHandler
sends results to theprocessingFileChannel
and the same is done by the secondFileReadingMessageSource
. I'm not sure that it is your intention. Just in case to pay your attention.You might also need to know about
FileSplitter
, which lets you to process file line by line.Also you say
processingDirectory
, but then you usetmpDir
for theFileWritingMessageHandler
, which, I guess, assumes your copy logic.Let's do the task step by step! And then you figure out what, where and how to use!
EDIT
If you need just copy file to the
processingDirectory
without any reply, you should do one-way adapter:And then that your
@InboundChannelAdapter(channel = "processingFileChannel"
is good to pick up files for processing.Not sure that you need to
DeleteSourceFiles
though...