Based on the accepted answer code the following adjustment to that code worked for me:
// helper method to create a split flow out of a List of steps
private static Flow createParallelFlow(List<Step> steps) {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
Flow[] flows = new Flow[steps.size()];
for (int i = 0; i < steps.size(); i++) {
flows[i] = new FlowBuilder<SimpleFlow>(steps.get(i).getName()).start(steps.get(i)).build();
return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
I have updated the question to a version that correctly loops, but as the application will scale, being able to proces parallel is important, and I still don't know how to do that with a javaconfig dynamically at runtime...
Refined question: How do I create a reader-processor-writer dynamically at runtime for say 5 different situations (5 queries means a loop of 5 as it is configured now)?
My LoopDecider looks like this:
public class LoopDecider implements JobExecutionDecider {
private static final Logger LOG = LoggerFactory.getLogger(LoopDecider.class);
private static final String COMPLETED = "COMPLETED";
private static final String CONTINUE = "CONTINUE";
private static final String ALL = "queries";
private static final String COUNT = "count";
private int currentQuery;
private int limit;
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
List<String> allQueries = (List<String>) jobExecution.getExecutionContext().get(ALL);
this.limit = allQueries.size();
jobExecution.getExecutionContext().put(COUNT, currentQuery);
if (++currentQuery >= limit) {
return new FlowExecutionStatus(COMPLETED);
} else {"Looping for query: " + allQueries.get(currentQuery - 1));
return new FlowExecutionStatus(CONTINUE);
Based on a list of queries (HQL queries) I want a reader - processor - writer for each query. My current configuration looks like this:
public Job subsetJob() throws Exception {
LoopDecider loopDecider = new LoopDecider();
FlowBuilder<Flow> flowBuilder = new FlowBuilder<>(FLOW_NAME);
Flow flow = flowBuilder
return jobBuilderFactory.get("subsetJob")
public Step extractData(){
return stepBuilderFactory.get("extractData")
public HibernateCursorItemReader reader(){
CustomHibernateCursorItemReader reader = new CustomHibernateCursorItemReader();
return reader;
public DynamicRecordProcessor processor(){
return new DynamicRecordProcessor();
public FlatFileItemWriter writer(){
CustomFlatFileItemWriter writer = new CustomFlatFileItemWriter();
writer.setLineAggregator(new DelimitedLineAggregator(){{
setFieldExtractor(new PassThroughFieldExtractor());
return writer;
Currently the process works fine for a single query. However, I actually have a list of queries.
My initial idea is to loop the step and pass the step the list of queries and for each query read - process - write. This would also be ideal for parallel chunking.
However, when I add the list of queries as parameter to the extractData step and for each query I create a step, a list of steps is returned, instead of the expected single step. The job starts complaining it expects a single step instead of a list of steps.
Another idea was to create a custom MultiHibernateCursorItemReader with the same idea as the MultiItemResourceReader, however I am really looking for a more out-of-the-box solution.
public List<Step> extractData(@Value("#{jobExecutionContext[HQL]}") List<String> queries){
List<Step> steps = new ArrayList<Step>();
for (String query : queries) {
return steps;
How do I loop the step and integrate that in the job?