spring integration + cron + quartz in cluster?

2019-01-09 07:25发布

I have a spring integration flow triggered by the cron expression like follows:

<int-ftp:inbound-channel-adapter id="my-input-endpoint" ...>
    <int:poller trigger="my-trigger"/>
</int-ftp:inbound-channel-adapter>

<bean id="my-trigger"
   class="org.springframework.scheduling.support.CronTrigger">
  <constructor-arg value="0 * * * * *" />
</bean>

It works fine. But now I have to extend the implementation to make it cluster ready (job execution on only one cluster node at the same point of time).

My wish would be to use the Quartz framework in the cluster mode (persisting the job status in the database) to trigger this integration flow. Quartz provides a beautful solution out of the box. The only problem is how to integrate the Quartz with the existing inbout-channer-adaptor? The "trigger" attribute of the "poller" accepts only the subclasses of the org.springframework.scheduling.Trigger. I could not find any bridge between "poller trigger" and the Quartz framework.

many thanks in advance!

4条回答
Lonely孤独者°
2楼-- · 2019-01-09 08:02

Here's one way...

Set the auto-startup attribute on the inbound-adapter to false.

Create a custom trigger that only fires once, immediately...

public static class FireOnceTrigger implements Trigger {

    boolean done;

    public Date nextExecutionTime(TriggerContext triggerContext) {
        if (done) {
            return null;
        }
        done = true;
        return new Date();
    }

    public void reset() {
        done = false;
    }
}

In your quartz job, get a reference to the trigger and the SourcePollingChannelAdapter.

When the quartz trigger fires, have the quartz job

  1. adapter.stop()
  2. trigger.reset()
  3. adapter.start()
查看更多
该账号已被封号
3楼-- · 2019-01-09 08:10

tried to integrate the quartz and spring as you proposed but faced two other problems:

1.) IncompatibleClassChangeError exception when using Quartz 2.x and Spring 3.x. It is a known problem but I did not find any solution for that.

2.) Injection of other spring bean into the Quarz job instance. I found some solutions but no one works for me. I've tried the one with using

<bean id="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
    <property name="jobFactory">
        <bean class="org.springframework.scheduling.quartz.SpringBeanJobFactory" />
    </property>

    <property name="triggers">
        ...
    </property>

    <property name="schedulerContextAsMap">
        <map>
            <entry key="inputEndpoint" value-ref="my-endpoint" />
        </map>
    </property>
</bean>

to inject other beans into the job but after adding this property into the SchedulerFactoryBean the jobs is not being executed (and I dont see any exception). Removing the property "schedulerContextAsMap" out makes the job running again.

查看更多
Lonely孤独者°
4楼-- · 2019-01-09 08:12

the solution from Gary works. This my spring context:

<int-ftp:inbound-channel-adapter id="my-endpoint"
        auto-startup="false">
    <int:poller trigger="my-endpoint-trigger"/>
</int-ftp:inbound-channel-adapter>


<bean id="my-endpoint-trigger" class="com.my.FireOnceTrigger"/>

<bean id="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">

    <property name="triggers">
        <list>
            <ref bean="my-job-trigger" />
        </list>
    </property>

    <property name="schedulerContextAsMap">
        <map>
            <entry key="inputEndpoint"><ref bean="my-input-endpoint" /></entry>
            <entry key="inputEndpointTrigger"><ref bean="my-endpoint-trigger" /></entry>
        </map>
    </property>
</bean>

<bean id="my-job-trigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
    <property name="cronExpression" value="0 * * * * ?" />
    <property name="jobDetail" ref="my-job" />
</bean>

<bean name="my-job" class="org.springframework.scheduling.quartz.JobDetailBean">
    <property name="jobClass" value="com.my.MyActivatorJob " />
</bean>

and MyActivatorJob class:

public class MyActivatorJob extends QuartzJobBean implements {

private AbstractEndpoint inputEndpoint;

private FireOnceTrigger inputEndpointTrigger;

public void setInputEndpoint(final AbstractEndpoint pInputEndpoint) {
    this.inputEndpoint = pInputEndpoint;
}

public void setInputEndpointTrigger(final FireOnceTrigger pInputEndpointTrigger) {
    this.inputEndpointTrigger = pInputEndpointTrigger;
}

@Override
protected void executeInternal(final JobExecutionContext pParamJobExecutionContext)
throws JobExecutionException {

    inputEndpoint.stop();
    inputEndpointTrigger.reset();
    inputEndpoint.start();
}

}

As a next step this spring context would have to be refactored to replace the usage of schedulerContextAsMap with something more flexible and be able to define more jobs activating and deactivating many different endpoints.

Thanks Gary so far!

查看更多
趁早两清
5楼-- · 2019-01-09 08:23

I haven't tried it but see that the Quartz 2 and Spring compatibility issues seem to have been fixed in Spring 3.1.1. See https://jira.springsource.org/browse/SPR-8889

查看更多
登录 后发表回答