I am new to Spring batch and couldn't figure out how to do this..
Basically I have a spring file poller which runs every N mins to look for files with some name (ex: A.txt & B.txt) in certain directory. At any moment in time, there could be max 2 files in this directory (A and B). Through Spring Batch Job, these two files will be processed and persisted to 2 different DB tables.
These files are somewhat similar, so the same processor/writer is used.
Right now the way I set up, every polling cycle 1 file is picked up and job is ran.
Let's say there are 2 files in the directory (A.txt and B.txt), is there a way to create 2 jobs so that both jobs can be run in parallel?
I believe that you can. Since you are new in spring batch (just like me) I would recommend that you go through the domain language of a batch if you haven't done so already.
Then you may start by configuring your own asynchronous JobLauncher
. For example:
@Bean
public JobLauncher jobLauncher() throws Exception
{
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
Pay special attention to SimpleAsyncTaskExecutor
(the job repo can be autowired). This configuration will allow asynchronous execution as visualized next:
Compare it with the synchronous execution flow:
Maybe it would additionally help to quote the SimpleJobLauncher
java doc:
Simple implementation of the JobLauncher interface. The Spring Core
TaskExecutor interface is used to launch a Job. This means that the
type of executor set is very important. If a SyncTaskExecutor is used,
then the job will be processed within the same thread that called the
launcher. Care should be taken to ensure any users of this class
understand fully whether or not the implementation of TaskExecutor
used will start tasks synchronously or asynchronously. The default
setting uses a synchronous task executor.
More details and configuration options - here.
At the end just create the jobs with different names and/or launch them with different parameter set. Naive example would be:
@Autowired
public JobBuilderFactory jobBuilderFactory;
public Job createJobA() {
return jobBuilderFactory.get("A.txt")
.incrementer(new RunIdIncrementer())
.flow(step1())
.next(step2())
.end()
.build();
}
public Job createJobB() {
return jobBuilderFactory.get("B.txt")
.incrementer(new RunIdIncrementer())
.flow(step1())
.next(step2())
.end()
.build();
}
Executing these jobs with your asynchronous job launcher will create two job instances that which will execute in parallel. This is just one option, that may or may not be suitable for your context.
There are very good approaches in order to run jobs in async mode with Spring, it is just a matter of how is configured the JobLauncher
. The JobLauncher
has a taskExecutor
property and the asynchronous execution could be activated depending on the implementation that is assigned to that property.
You can find all the TaskExecutor
types that Spring can provide and depending on your needs select the best approach to accomplish your batch asynchronous jobs. Task Executors Types in Spring
For example SimpleAsyncTaskExecutor
is a task executor that will create a new Thread
on any invocation and that could generate a performance issue if the execution runs with high frequency. In the other hand there are also TaskExecutors
types that provides pooling features in order to reuse resources and maximize the efficiency of the system.
Here is an small example of how configure a ThreadPoolTaskExecutor
:
A) Configure ThreadPoolTaskExecutor Bean
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(15);
taskExecutor.setMaxPoolSize(20);
taskExecutor.setQueueCapacity(30);
return taskExecutor;
}
B) Configure JobLauncher Bean
@Bean
public JobLauncher jobLauncher(ThreadPoolTaskExecutor taskExecutor, JobRepository jobRepository){
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setTaskExecutor(taskExecutor);
jobLauncher.setJobRepository(jobRepository);
return jobLauncher;
}
C) Inject your JobLauncher
and your Jobs
configuration
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Qualifier("job1-file-A")
private Job job1;
@Autowired
@Qualifier("job2-file-B")
private Job job2;
D) Schedule the jobs
@Scheduled(cron = "*/1 * * * * *")
public void run1(){
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("time", new JobParameter(System.currentTimeMillis()));
JobParameters jobParameters = new JobParameters(confMap);
try {
jobLauncher.run(job1, jobParameters);
}catch (Exception ex){
logger.error(ex.getMessage());
}
}
@Scheduled(cron = "*/1 * * * * *")
public void run2(){
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("time", new JobParameter(System.currentTimeMillis()));
JobParameters jobParameters = new JobParameters(confMap);
try {
jobLauncher.run(job2, jobParameters);
}catch (Exception ex){
logger.error(ex.getMessage());
}
}
E) Finally on your SpringBoot Class @EnableBatchProcessing
and @EnableScheduling
@EnableBatchProcessing
@EnableScheduling
@SpringBootApplication
public class MyBatchApp {