Spring integration DSL running Sftp OutBound flow

2019-08-15 00:14发布

I have created two integration flow, one is File inbound flow which is lookup my local directory and other one is Sftp outbound flow which is send those files from local to remote directory. I am running these flows from Main class i.e without using the Junit. So how cound I send message from File Inbound flow to Sftp outblound flow. in Junit Test method I can use send() method for file transferring but in this case not able to send the files. So can any one help me as I am not able to proceed.

This is my File inbound flow..

@Bean
public IntegrationFlow fileReadingFlow() {          
    return IntegrationFlows
            .from(fileMessageSource(),
                    new Consumer<SourcePollingChannelAdapterSpec>() {

                        @Override
                        public void accept(SourcePollingChannelAdapterSpec e) {
                            e.autoStartup(true).poller(Pollers.fixedRate(6000)
                                            .maxMessagesPerPoll(1));
                        }
                    }).transform(Transformers.fileToByteArray())
            .channel(MessageChannels.queue("fileReadingResultChannel"))
            .get();
}

@Bean
public MessageSource<File> fileMessageSource() {

    FileReadingMessageSource source = new FileReadingMessageSource();
    source.setDirectory(new File(localDir));    
    source.setAutoCreateDirectory(true);        
    return source;
}

This is my sftp outbound flow..

@Bean
public IntegrationFlow sftpOutboundFlow() {
    System.out.println("enter out  bound flow.....");
    return IntegrationFlows
            .from("toSftpChannel")
            .handle(Sftp.outboundAdapter(this.sftpSessionFactory)
                    .remoteFileSeparator("\\")
                    .useTemporaryFileName(false)
                    .remoteDirectory(remDir)).get();
}

As suggested by Gary, My new flow is....

@Bean
public IntegrationFlow fileReadingFlow() {          
    return IntegrationFlows
            .from("fileReadingResultChannel")
            .transform(Transformers.fileToByteArray())
            .handle(Sftp.outboundAdapter(this.sftpSessionFactory)
                    .remoteFileSeparator("\\")
                    .useTemporaryFileName(false)
                    .remoteDirectory(remDir))
            //.channel(this.fileReadingResultChannel)
                    .channel(MessageChannels.direct("fileReadingResultChannel"))
            .get();
}

And the error log is...

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'fileReadingFlow' defined in class src.SftpOutBoundDsl: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.integration.dsl.IntegrationFlow]: Factory method 'fileReadingFlow' threw exception; nested exception is org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (org.springframework.integration.file.remote.handler.FileTransferringMessageHandler@629a) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow. at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:602) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1111) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1006) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:504) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:476) at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:302) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:298) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:193) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:762) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:760) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:482) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:686) at org.springframework.boot.SpringApplication.run(SpringApplication.java:320) at org.springframework.boot.SpringApplication.run(SpringApplication.java:957) at org.springframework.boot.SpringApplication.run(SpringApplication.java:946) at src.MainSftpOutBoundDsl.main(MainSftpOutBoundDsl.java:14)

1条回答
小情绪 Triste *
2楼-- · 2019-08-15 00:37

Change the inbound adapter channel to use...

MessageChannels.direct("fileReadingResultChannel")

and wire the two flows together...

return IntegrationFlows
        .from("fileReadingResultChannel")
        .handle(...

EDIT:

.channel(MessageChannels.direct("fileReadingResultChannel"))

The outbound channel adapter produces no result so you can't follow it with a channel.

The error message seems pretty clear to me...

... FileTransferringMessageHandler ... is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.

...but if you think it needs clarification, let us know.

If you want the flow to continue after the file transfer, add a publishSubscribeChannel after the transformer and the message will go to another subscriber (flow) if the file transfer is successful.

查看更多
登录 后发表回答