I am using spring batch to parse files and I have the following scenario:
I am running a job. This job has to parse a giving file. For unexpected reason (let say for power cut) the server fails and I have to restart the machine. Now, after restarting the server I want to resume the job from the point which stopped before the power cut. This means that if the system read 1.300 rows from 10.000 now have to start reading from 1.301 row.
How can I achieve this scenario using spring batch?
About configuration: I use spring-integration which polls under a directory for new files. When a file is arrived the spring-integration creates the spring batch job. Also, spring-batch uses FlatFileItemReader to parse the file.
Here is the complete solution to restart a job after JVM crash.
- Make a job restartable by making restarable="true"
job id="jobName" xmlns="http://www.springframework.org/schema/batch"
restartable="true"
2 . Code to restart a job
import java.util.Date;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Autowired;
public class ResartJob {
@Autowired
private JobExplorer jobExplorer;
@Autowired
JobRepository jobRepository;
@Autowired
private JobLauncher jobLauncher;
@Autowired
JobOperator jobOperator;
public void restart(){
try {
List<JobInstance> jobInstances = jobExplorer.getJobInstances("jobName",0,1);// this will get one latest job from the database
if(CollectionUtils.isNotEmpty(jobInstances)){
JobInstance jobInstance = jobInstances.get(0);
List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance);
if(CollectionUtils.isNotEmpty(jobExecutions)){
for(JobExecution execution: jobExecutions){
// If the job status is STARTED then update the status to FAILED and restart the job using JobOperator.java
if(execution.getStatus().equals(BatchStatus.STARTED)){
execution.setEndTime(new Date());
execution.setStatus(BatchStatus.FAILED);
execution.setExitStatus(ExitStatus.FAILED);
jobRepository.update(execution);
jobOperator.restart(execution.getId());
}
}
}
}
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
3.
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean" p:dataSource-ref="dataSource" p:transactionManager-ref="transactionManager" p:lobHandler-ref="oracleLobHandler"/>
<bean id="oracleLobHandler" class="org.springframework.jdbc.support.lob.DefaultLobHandler"/>
<bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean" p:dataSource-ref="dataSource" />
<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor" ref="jobLauncherTaskExecutor" />
</bean> <task:executor id="jobLauncherTaskExecutor" pool-size="6" rejection-policy="ABORT" />
<bean id="jobOperator" class="org.springframework.batch.core.launch.support.SimpleJobOperator" p:jobLauncher-ref="jobLauncher" p:jobExplorer-re`enter code here`f="jobExplorer" p:jobRepository-ref="jobRepository" p:jobRegistry-ref="jobRegistry"/>
An updated work-around for Spring batch 4. Takes JVM start up time into account for broken jobs detection. Please note that this will not work when in a clustered environment where multiple servers start jobs.
@Bean
public ApplicationListener<ContextRefreshedEvent> resumeJobsListener(JobOperator jobOperator, JobRepository jobRepository,
JobExplorer jobExplorer) {
// restart jobs that failed due to
return event -> {
Date jvmStartTime = new Date(ManagementFactory.getRuntimeMXBean().getStartTime());
// for each job
for (String jobName : jobExplorer.getJobNames()) {
// get latest job instance
for (JobInstance instance : jobExplorer.getJobInstances(jobName, 0, 1)) {
// for each of the executions
for (JobExecution execution : jobExplorer.getJobExecutions(instance)) {
if (execution.getStatus().equals(BatchStatus.STARTED) && execution.getCreateTime().before(jvmStartTime)) {
// this job is broken and must be restarted
execution.setEndTime(new Date());
execution.setStatus(BatchStatus.FAILED);
execution.setExitStatus(ExitStatus.FAILED);
for (StepExecution se : execution.getStepExecutions()) {
if (se.getStatus().equals(BatchStatus.STARTED)) {
se.setEndTime(new Date());
se.setStatus(BatchStatus.FAILED);
se.setExitStatus(ExitStatus.FAILED);
jobRepository.update(se);
}
}
jobRepository.update(execution);
try {
jobOperator.restart(execution.getId());
}
catch (JobExecutionException e) {
LOG.warn("Couldn't resume job execution {}", execution, e);
}
}
}
}
}
};
}
What I would do in your situation is to create a step to log the last processed row in a file. Then create a second job that would read this file and start the processing from a specific row number.
So if the job stops due to whatever reason you will be able to run the new Job that will resume the processing.