I've a requirement to download a file from S3 based on a message content. In other words, the file to download is previously unknown, I've to search and find it at runtime. S3StreamingMessageSource doesn't seem to be a good fit because:
- It relies on polling where as I need to wait for the message.
- I can't find any way to create a
S3StreamingMessageSource
dynamically in the middle of a flow. gateway(IntegrationFlow)
looks interesting but what I need is a gateway(Function<Message<?>, IntegrationFlow>)
that doesn't exist.
Another candidate is S3MessageHandler but it has no support for listing files which I need for finding the desired file.
I can implement my own message handler using AWS API directly, just wondering if I'm missing something, because this doesn't seem like an unusual requirement. After all, not every app just sits there and keeps polling S3 for new files.
There is S3RemoteFileTemplate
with the list()
function which you can use in the handle()
. Then split()
result and call S3MessageHandler
for each remote file to download.
Although the last one has functionality to download the whole remote dir.
For anyone coming across this question, this is what I did. The trick is to:
- Set filters later, not at construction time. Note that there is no
addFilters
or getFilters
method, so filters can only be set once, and can't be added later. @artem-bilan, this is inconvenient.
Call S3StreamingMessageSource.receive
manually.
.handle(String.class, (fileName, h) -> {
if (messageSource instanceof S3StreamingMessageSource) {
S3StreamingMessageSource s3StreamingMessageSource = (S3StreamingMessageSource) messageSource;
ChainFileListFilter<S3ObjectSummary> chainFileListFilter = new ChainFileListFilter<>();
chainFileListFilter.addFilters(
new S3SimplePatternFileListFilter("**/*/*.json.gz"),
new S3PersistentAcceptOnceFileListFilter(metadataStore, ""),
new S3FileListFilter(fileName)
);
s3StreamingMessageSource.setFilter(chainFileListFilter);
return s3StreamingMessageSource.receive();
}
log.warn("Expected: {} but got: {}.",
S3StreamingMessageSource.class.getName(), messageSource.getClass().getName());
return messageSource.receive();
}, spec -> spec
.requiresReply(false) // in case all messages got filtered out
)