Spring Batch resume after server's failure

2020-05-27 03:58发布

I am using spring batch to parse files and I have the following scenario:

I am running a job. This job has to parse a giving file. For unexpected reason (let say for power cut) the server fails and I have to restart the machine. Now, after restarting the server I want to resume the job from the point which stopped before the power cut. This means that if the system read 1.300 rows from 10.000 now have to start reading from 1.301 row.

How can I achieve this scenario using spring batch?

About configuration: I use spring-integration which polls under a directory for new files. When a file is arrived the spring-integration creates the spring batch job. Also, spring-batch uses FlatFileItemReader to parse the file.

3条回答
混吃等死
2楼-- · 2020-05-27 04:18

Here is the complete solution to restart a job after JVM crash.

  1. Make a job restartable by making restarable="true"

job id="jobName" xmlns="http://www.springframework.org/schema/batch" restartable="true"

2 . Code to restart a job

import java.util.Date;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Autowired;

public class ResartJob {

    @Autowired
    private JobExplorer jobExplorer;
    @Autowired
    JobRepository jobRepository;
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired 
    JobOperator jobOperator;

    public void restart(){
        try {
            List<JobInstance> jobInstances = jobExplorer.getJobInstances("jobName",0,1);// this will get one latest job from the database
            if(CollectionUtils.isNotEmpty(jobInstances)){
               JobInstance jobInstance =  jobInstances.get(0);
               List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance);
               if(CollectionUtils.isNotEmpty(jobExecutions)){
                   for(JobExecution execution: jobExecutions){
                       // If the job status is STARTED then update the status to FAILED and restart the job using JobOperator.java
                       if(execution.getStatus().equals(BatchStatus.STARTED)){ 
                           execution.setEndTime(new Date());
                           execution.setStatus(BatchStatus.FAILED);                               
                           execution.setExitStatus(ExitStatus.FAILED);                               
                           jobRepository.update(execution);
                           jobOperator.restart(execution.getId());
                       }
                   }
               }
            }
        } catch (Exception e1) {
            e1.printStackTrace();
        }
    }
}

3.

<bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean" p:dataSource-ref="dataSource" p:transactionManager-ref="transactionManager" p:lobHandler-ref="oracleLobHandler"/>

<bean id="oracleLobHandler" class="org.springframework.jdbc.support.lob.DefaultLobHandler"/>


<bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean" p:dataSource-ref="dataSource" />

<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />

<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository" />
        <property name="taskExecutor" ref="jobLauncherTaskExecutor" /> 
</bean> <task:executor id="jobLauncherTaskExecutor" pool-size="6" rejection-policy="ABORT" />

<bean id="jobOperator" class="org.springframework.batch.core.launch.support.SimpleJobOperator" p:jobLauncher-ref="jobLauncher" p:jobExplorer-re`enter code here`f="jobExplorer" p:jobRepository-ref="jobRepository" p:jobRegistry-ref="jobRegistry"/>
查看更多
一夜七次
3楼-- · 2020-05-27 04:20

What I would do in your situation is to create a step to log the last processed row in a file. Then create a second job that would read this file and start the processing from a specific row number.

So if the job stops due to whatever reason you will be able to run the new Job that will resume the processing.

查看更多
够拽才男人
4楼-- · 2020-05-27 04:26

An updated work-around for Spring batch 4. Takes JVM start up time into account for broken jobs detection. Please note that this will not work when in a clustered environment where multiple servers start jobs.

@Bean
public ApplicationListener<ContextRefreshedEvent> resumeJobsListener(JobOperator jobOperator, JobRepository jobRepository,
        JobExplorer jobExplorer) {
    // restart jobs that failed due to
    return event -> {
        Date jvmStartTime = new Date(ManagementFactory.getRuntimeMXBean().getStartTime());

        // for each job
        for (String jobName : jobExplorer.getJobNames()) {
            // get latest job instance
            for (JobInstance instance : jobExplorer.getJobInstances(jobName, 0, 1)) {
                // for each of the executions
                for (JobExecution execution : jobExplorer.getJobExecutions(instance)) {
                    if (execution.getStatus().equals(BatchStatus.STARTED) && execution.getCreateTime().before(jvmStartTime)) {
                        // this job is broken and must be restarted
                        execution.setEndTime(new Date());
                        execution.setStatus(BatchStatus.FAILED);
                        execution.setExitStatus(ExitStatus.FAILED);

                        for (StepExecution se : execution.getStepExecutions()) {
                            if (se.getStatus().equals(BatchStatus.STARTED)) {
                                se.setEndTime(new Date());
                                se.setStatus(BatchStatus.FAILED);
                                se.setExitStatus(ExitStatus.FAILED);
                                jobRepository.update(se);
                            }
                        }

                        jobRepository.update(execution);
                        try {
                            jobOperator.restart(execution.getId());
                        }
                        catch (JobExecutionException e) {
                            LOG.warn("Couldn't resume job execution {}", execution, e);
                        }
                    }
                }
            }
        }
    };
}
查看更多
登录 后发表回答