Spring Batch - How to generate parallel steps base

2019-03-06 03:18发布

问题:

Introduction

I am trying to use jobparameters created in a tasklet to create steps following the execution of the tasklet.

A tasklet tries to finds some files (findFiles()) and if it finds some files it saves the filenames to a list of strings.

In the tasklet I pass the data as following: chunkContext.getStepContext().getStepExecution().getExecutionContext().put("files", fileNames);

The next step is a parallel flow where for each file a simple reader-processor-writer step will be executed (if you are interested in how I got there please see my previous question: Spring Batch - Looping a reader/processor/writer step)

Upon building the job readFilesJob() a flow is created initially using a "fake" list of files because only after the tasklet has been executed the real list of files is known.

Question

How do I configure my job so the tasklet gets executed first and then the parallel flow gets executed using the list of files generated from the tasklet?

I think it comes down to getting the list of filenames loaded with the correct data at the correct moment during runtime... but how?

Reproduce

Here is my simplified configuration:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    private static final String FLOW_NAME = "flow1";
    private static final String PLACE_HOLDER = "empty";

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    public List<String> files = Arrays.asList(PLACE_HOLDER);

    @Bean
    public Job readFilesJob() throws Exception {   
        List<Step> steps = files.stream().map(file -> createStep(file)).collect(Collectors.toList());

        FlowBuilder<Flow> flowBuilder = new FlowBuilder<>(FLOW_NAME);

        Flow flow = flowBuilder
                .start(findFiles())             
                .next(createParallelFlow(steps))
                .build();       

        return jobBuilderFactory.get("readFilesJob")                
                .start(flow)                
                .end()
                .build();
    }

    private static Flow createParallelFlow(List<Step> steps){
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(steps.size());

        List<Flow> flows = steps.stream()
                .map(step ->
                        new FlowBuilder<Flow>("flow_" + step.getName()) 
                        .start(step) 
                        .build()) 
                .collect(Collectors.toList());

        return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor) 
             .add(flows.toArray(new Flow[flows.size()]))
             .build();      
    }

    private Step createStep(String fileName){
        return stepBuilderFactory.get("readFile" + fileName)
                .chunk(100)
                .reader(reader(fileName))               
                .writer(writer(filename))                               
                .build();
    }

    private FileFinder findFiles(){
        return new FileFinder();
    }
}

Research

The question and answer from How to safely pass params from Tasklet to step when running parallel jobs suggest the usage of a construct like this in the reader/writer:

@Value("#{jobExecutionContext[filePath]}") String filePath

However, I really hope it is possible to pass the fileName as a string to the reader/writer due to the way the steps are created in the createParallelFlow() method. Therefore, even tho the answer to that question might be a solution for my problem here, it is not the desired solution. But please do not refrain from correcting me if I am wrong.

Closing

I am using the file names example to clarify the problem better. My problem is not actually the reading of multiple files from a directory. My question really boils down to the idea of generating data during runtime and passing it to the next dynamically generated step(s).

EDIT:

Added a simplified tasklet of the fileFinder.

@Component
public class FileFinder implements Tasklet, InitializingBean {

    List<String> fileNames;

    public List<String> getFileNames() {
        return fileNames;
    }

    @PostConstruct
    public void afterPropertiesSet() {
        // read the filenames and store dem in the list
        fileNames.add("sample-data1.csv");
        fileNames.add("sample-data2.csv");
    }

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        // Execution of methods that will find the file names and put them in the list...
        chunkContext.getStepContext().getStepExecution().getExecutionContext().put("files", fileNames);                     
        return RepeatStatus.FINISHED;
    }    
}

回答1:

I'm not sure, if I did understand your problem correctly, but as far as I see, you need to have the list with the filenames before you build your job dynamically.

You could do it like this:

@Component
public class MyJobSetup {
    List<String> fileNames;

    public List<String> getFileNames() {
        return fileNames;
    }

    @PostConstruct
    public void afterPropertiesSet() {
        // read the filenames and store dem in the list
        fileNames = ....;
    }
}

After that, you can inject this Bean inside your JobConfiguration Bean

@Configuration
@EnableBatchProcessing
@Import(MyJobSetup.class)
public class BatchConfiguration {

    private static final String FLOW_NAME = "flow1";
    private static final String PLACE_HOLDER = "empty";

    @Autowired
    private  MyJobSetup jobSetup; // <--- Inject
          // PostConstruct of MyJobSetup was executed, when it is injected

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    public List<String> files = Arrays.asList(PLACE_HOLDER);

    @Bean
    public Job readFilesJob() throws Exception {   
        List<Step> steps = jobSetUp.getFileNames() // get the list of files
             .stream() // as stream
             .map(file -> createStep(file)) // map...
             .collect(Collectors.toList()); // and create the list of steps