I went through the Spring Integration guide and the examples here and got a working sample for Spring Integration SFTP program. I already have a working Spring Batch program that reads a bunch of file and dumps into the Database.
I am now trying to integrate both Spring Batch and Spring Integration programs by going through the Spring docs and I created the below configuration.
<bean id="sftpSessionFactory" class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<property name="host" value="${host}"/>
<property name="port" value="${port}"/>
<property name="user" value="${user}"/>
<property name="password" value="${password}"/>
</bean>
<int:channel id="inboundFileChannel"><int:queue/></int:channel>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="inboundFileChannel"
session-factory="sftpSessionFactory"
local-directory="file:/chofac/data/mex/registry/outbox"
remote-directory="/chofac/SFTP/MEX/outbox"
auto-create-local-directory="true"
delete-remote-files="false"
filename-pattern="*.txt">
<int:poller max-messages-per-poll="-1" fixed-rate="1000" />
</int-sftp:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="com.chofac.mint.integration.FileMessageToJobRequest">
<property name="job" ref="responseFileReaderJob"/>
<!-- <property name="fileParameterName" value="input.file.name"/> -->
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
<int:poller fixed-rate="1000"/>
</batch-int:job-launching-gateway>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
<batch:job id="responseFileReaderJob">
<batch:step id="dailyReaderJob">
<batch:tasklet>
<batch:chunk reader="dailyRRReader" writer="dailyRRDBWriter" processor="itemProcessor" commit-interval="10"/>
</batch:tasklet>
</batch:step>
</batch:job>
I am running this program using the test case below:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:META-INF/spring/applicationContext.xml","classpath:META-INF/spring/inbound-ResponseReaderJobIntegration.xml"})
public class AAASftpInboundMsgJobTriggerTest {
@Resource(name="inboundFileChannel")
PollableChannel localFileChannel;
@Test
public void runDemo(){
System.out.println("Received first file message: " + localFileChannel.receive());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
I get this error:
2014-07-01 10:51:48,987 [main] INFO org.hibernate.impl.SessionFactoryImpl - closing 2014-07-01 10:51:48,988 [main] DEBUG org.springframework.beans.factory.support.DisposableBeanAdapter - Invoking destroy method 'close' on bean with name 'dataSource' 2014-07-01 10:51:48,988 [main] ERROR org.springframework.test.context.TestContextManager - Caught exception while allowing TestExecutionListener [org.springframework.test.context.support.DependencyInjectionTestExecutionListener@2faadcc6] to prepare test instance [com.chofac.mint.integration.AAASftpInboundMsgJobTriggerTest@5d3ad33d] java.lang.IllegalStateException: Failed to load ApplicationContext at org.springframework.test.context.CacheAwareContextLoaderDelegate.loadContext(CacheAwareContextLoaderDelegate.java:99) at org.springframework.test.context.DefaultTestContext.getApplicationContext(DefaultTestContext.java:101) at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.injectDependencies(DependencyInjectionTestExecutionListener.java:109) at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.prepareTestInstance(DependencyInjectionTestExecutionListener.java:75) at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:326) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.createTest(SpringJUnit4ClassRunner.java:212) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner$1.runReflectiveCall(SpringJUnit4ClassRunner.java:289) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.methodBlock(SpringJUnit4ClassRunner.java:291) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:232) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:89) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222) at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:71) at org.junit.runners.ParentRunner.run(ParentRunner.java:300) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:175) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197) Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0': Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: No poller has been defined for endpoint 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0', and no default poller is available within the context. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1553) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:539) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:475) at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:304) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:228) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:300) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:195) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:681) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:760) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:482) at org.springframework.test.context.support.AbstractGenericContextLoader.loadContext(AbstractGenericContextLoader.java:121) at org.springframework.test.context.support.AbstractGenericContextLoader.loadContext(AbstractGenericContextLoader.java:60) at org.springframework.test.context.support.AbstractDelegatingSmartContextLoader.delegateLoading(AbstractDelegatingSmartContextLoader.java:100) at org.springframework.test.context.support.AbstractDelegatingSmartContextLoader.loadContext(AbstractDelegatingSmartContextLoader.java:250) at org.springframework.test.context.CacheAwareContextLoaderDelegate.loadContextInternal(CacheAwareContextLoaderDelegate.java:64) at org.springframework.test.context.CacheAwareContextLoaderDelegate.loadContext(CacheAwareContextLoaderDelegate.java:91) ... 25 more Caused by: java.lang.IllegalArgumentException: No poller has been defined for endpoint 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0', and no default poller is available within the context. at org.springframework.util.Assert.notNull(Assert.java:112) at org.springframework.integration.config.ConsumerEndpointFactoryBean.initializeEndpoint(ConsumerEndpointFactoryBean.java:238) at org.springframework.integration.config.ConsumerEndpointFactoryBean.afterPropertiesSet(ConsumerEndpointFactoryBean.java:187) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1612) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1549) ... 40 more 2014-07-01 10:51:48,992 [main] DEBUG org.springframework.test.context.support.DirtiesContextTestExecutionListener - After test class: context [DefaultTestContext@ac44b88 testClass = SftpInboundMsgJobTriggerTest, testInstance = [null], testMethod = [null], testException = [null], mergedContextConfiguration = [MergedContextConfiguration@4102799c testClass = SftpInboundMsgJobTriggerTest, locations = '{classpath:META-INF/spring/applicationContext.xml, classpath:META-INF/spring/inbound-ResponseReaderJobIntegration.xml.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]], dirtiesContext [false].
I experimented with most of the top answers on this issue from Google and the suggestions from StachOverflow as I typed this question, all of them resulted in different other errors and I seem to be getting diverted from the main issue.
Most common suggestion was to add a global poller, but this resulted in the error below:
<int:poller default="true" fixed-delay="50"/>
Caused by: java.lang.IllegalArgumentException: A poller should not be specified for endpoint 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1', since 'outboundJobRequestChannel' is a SubscribableChannel (not pollable).
(I am a newbie in all of these, Spring, Spring Batch and Spring Integration) Any help would be greatly appreciated. Thanks in advance.
Update 1
I removed the poller in the #2 like the below
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
</batch-int:job-launching-gateway>
and removed the global poller
<int:poller default="true" fixed-delay="50"/>
I get the below exception:
Exception in thread "main" org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0': Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: No poller has been defined for endpoint 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0', and no default poller is available within the context. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1553) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:539) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:475) at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:304) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:228) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:300) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:195) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:681) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:760) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:482) at org.springframework.context.support.ClassPathXmlApplicationContext.(ClassPathXmlApplicationContext.java:139) at org.springframework.context.support.ClassPathXmlApplicationContext.(ClassPathXmlApplicationContext.java:93) at com.chofac.mint.integration.DownloadFileRunBatch.main(DownloadFileRunBatch.java:15) Caused by: java.lang.IllegalArgumentException: No poller has been defined for endpoint 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0', and no default poller is available within the context. at org.springframework.util.Assert.notNull(Assert.java:112) at org.springframework.integration.config.ConsumerEndpointFactoryBean.initializeEndpoint(ConsumerEndpointFactoryBean.java:238) at org.springframework.integration.config.ConsumerEndpointFactoryBean.afterPropertiesSet(ConsumerEndpointFactoryBean.java:187) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1612) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1549) ... 12 more
However... if I leave the global poller the SFTP transfer happens and the job gets triggered
<int:poller default="true" fixed-delay="50"/>
Update 2
I get the below exception if I remove
Exception in thread "main" org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named 'inboundFileChannel' must be of type [org.springframework.messaging.PollableChannel], but was actually of type [org.springframework.integration.channel.DirectChannel] at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:376) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:979) at com.chofac.mint.batchintegration.SftpInboundMsgJobTriggerMain.main(SftpInboundMsgJobTriggerMain.java:16)
Here's my configuration again:
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="inboundFileChannel"
session-factory="sftpSessionFactory"
local-directory="file:${inbound.local.directory}"
remote-directory="${inbound.remote.directory}"
auto-create-local-directory="true"
delete-remote-files="false"
filename-pattern="*.*">
<int:poller max-messages-per-poll="-1" fixed-rate="45000" />
<!-- <int:poller max-messages-per-poll="-1" cron="${MEX.CRON.PATTERN}"/> -->
<!-- 0 15 10 ? * MON-FRI -->
</int-sftp:inbound-channel-adapter>
<int:channel id="inboundFileChannel"></int:channel>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<!-- <int:poller default="true" cron="${MEX.CRON.PATTERN}"/> -->
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="com.chofac.mint.integration.FileMessageToJobRequest">
<property name="job" ref="responseFileReaderJob"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
</batch-int:job-launching-gateway>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
I am triggering this from a Main program as below:
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:META-INF/spring/applicationContext.xml","classpath:META-INF/spring/batchintegration/inboundSFTPJob.xml");
PollableChannel pollableFileChannel = context.getBean("inboundFileChannel", PollableChannel.class);
System.out.println("Received first file message: " + pollableFileChannel.receive());
Update 3
Sample Configuration, JUnit, and Spring example on the web.
Update 4
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="inboundFileChannel"
session-factory="sftpSessionFactory"
local-directory="file:${inbound.local.directory}"
remote-directory="${inbound.remote.directory}"
auto-create-local-directory="true"
delete-remote-files="false"
filename-pattern ="*.*"
local-filter="acceptAllFileListFilter">
<int:poller max-messages-per-poll="-1" fixed-rate="45000" />
<bean id="acceptAllFileListFilter" class="org.springframework.integration.file.filters.AcceptAllFileListFilter"/>
program runs into a continuous loop with exceptions. I feel the polling happens very frequently (every second) though I have specified 45 seconds for the poller Full configuration and logs here.
outboundJobRequestChannel
is aSubscribableChannel
so you cannot have a<poller/>
on batch-int:job-launching-gateway.inboundFileChannel
is aQueueChannel
so its consumer needs a poller (the transformer).Notice
#0
,#1
in the bean names. Theint-sftp:inbound-channel-adapter
(#0
) correctly has a poller; just remove the other one.You don't really need a global (default) poller for this case (but it is being used for your transformer).
EDIT:
(reply to the comment on your question).
That is the default behavior. By default, the
local-filter
is anAcceptOnceFileListFilter
. If you remove the file before the next poll, you can change this to anAcceptAllFileListFilter
.If you want to leave the file on disk, but detect it has been changed, use a
FileSystemPersistentAcceptOnceFileListFilter
; since you are not deleting the remote file, you should also set thefilter
to anSftpPersistentAcceptOnceFileListFilter
(actually aCompositeFileListFilter
that wraps this filter and one of the pattern filters).Otherwise, you'll keep fetching the same file and, without
preseve-timestamp
, the local filter will think it's a new file each time.The persistent filters use a metadata store and use the file name and
lastModified
timestamp to determine if the filter should accept (pass) the file.EDIT2:
That sample just dumps the file into a queue channel and received in the
main
; you have a transformer subscribed.It's this code...
That's causing you the current grief - just remove those lines from your main.