Give Priority to SFTP Remote Directories

2019-08-07 02:18发布

问题:

Using single SFTP channel I need to process two remote directories lowpriority and highprioiry but lowpriority files pick after the highpriority . please let know how handle multiple directories in SFTP inbound adapter with single channel ? We can do using https://docs.spring.io/spring-integration/reference/html/sftp.html#sftp-rotating-server-advice Rotation Service advice in Spring 5.1.2 Release but what about 4.3.12 Release.?

回答1:

It is not available in 4.3.x; the feature was added in 5.0.7.

It needs infrastructure changes so it will be hard to replicate with custom code in 4.3.x.

You could use two adapters and stop/start them as necessary.

EDIT

Here is one solution; the advice on the primary flow starts the secondary flow when no new files are found. The secondary flow runs just once, then restarts the primary flow; and the cycle continues...

@SpringBootApplication
public class So54329898Application {

    public static void main(String[] args) {
        SpringApplication.run(So54329898Application.class, args);
    }

    @Bean
    public IntegrationFlow primary(SessionFactory<LsEntry> sessionFactory) {
        return IntegrationFlows.from(Sftp.inboundAdapter(sessionFactory)
                .localDirectory(new File("/tmp/foo"))
                .remoteDirectory("foo/foo"), e -> e
                        .poller(Pollers.fixedDelay(5_000, 5_000)
                                .advice(startSecondaryAdvice())))
                .channel("channel")
                .get();
    }

    @Bean
    public IntegrationFlow secondary(SessionFactory<LsEntry> sessionFactory) {
        return IntegrationFlows.from(Sftp.inboundAdapter(sessionFactory)
                .localDirectory(new File("/tmp/foo"))
                .remoteDirectory("foo/bar"), e -> e
                        .poller(Pollers.trigger(oneShotTrigger(sessionFactory)))
                        .autoStartup(false))
                .channel("channel")
                .get();
    }

    @Bean
    public IntegrationFlow main() {
        return IntegrationFlows.from("channel")
                .handle(System.out::println)
                .get();
    }

    @Bean
    public Advice startSecondaryAdvice() {
        return new StartSecondaryWhenPrimaryIdle();
    }

    @Bean
    public FireOnceTrigger oneShotTrigger(SessionFactory<LsEntry> sessionFactory) {
        return new FireOnceTrigger((Lifecycle) primary(sessionFactory));
    }

    public static class StartSecondaryWhenPrimaryIdle extends AbstractMessageSourceAdvice
            implements ApplicationContextAware {

        private ApplicationContext applicationContext;

        @Override
        public boolean beforeReceive(MessageSource<?> source) {
            return true;
        }

        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }

        @Override
        public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
            if (result == null) {
                System.out.println("No more files on primary; starting single shot on secondary");
                this.applicationContext.getBean("primary", Lifecycle.class).stop();
                this.applicationContext.getBean("secondary", Lifecycle.class).stop();
                this.applicationContext.getBean(FireOnceTrigger.class).reset();
                this.applicationContext.getBean("secondary", Lifecycle.class).start();
            }
            return result;
        }

    }

    public static class FireOnceTrigger implements Trigger {

        private final Lifecycle primary;

        private volatile boolean done;

        public FireOnceTrigger(Lifecycle primary) {
            this.primary = primary;
        }

        @Override
        public Date nextExecutionTime(TriggerContext triggerContext) {
            if (done) {
                System.out.println("One shot on secondary complete; restarting primary");
                this.primary.start();
                return null;
            }
            done = true;
            return new Date();
        }

        public void reset() {
            done = false;
        }

    }

}