Spring Batch Partitions Join overriding the RowMap

2020-07-28 08:09发布

问题:

I am using Spring Batch to read data from Postgres and write it into the MongoDB. In my case, Employee has 3 different types of email address 1) Home Address 2) Office Address 3) Social Address from Employee Email Table.

Since we've almost 10 lacs employees in DB, hence using Custom Partitions to pull the data from Postgres and making the join with employee_email (later employee_phone as well) so that in Processor will have a mapping for Mongo POJO and save into MongoDB.

Now the issue is I need to embed Employee email record into the Contact as a array, but with the current logic its saving as a separate collection

How can we solved this issue ?

select * from root.employees c
full outer join root.employee_email ce
on c.employee_id = ce.employee_id
order by c.employee_id limit 1000 offset 0;

Now when data saved into DB, only email is getting saved and appears that other 2 are getting override.

How would I need to handle that, looks like EmployeeRowMapper is overriding all other email address. How will I solve this issue ?

Employee.Job

@Configuration
public class EmployeeJob {
    private static Logger logger = LoggerFactory.getLogger(EmployeeJob.class);

    private static final Integer CHUNK_SIZE = 1000;

    @Autowired
    private DataSource dataSource;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public EmployeesPartitions EmployeesPartition() {
        return new EmployeesPartitions();
    }

    @Bean
    public EmployeesJobListener EmployeesJobListener() {
        return new EmployeesJobListener();
    }

    @Bean("readEmployeeJob")
    public Job readEmployeeJob() throws Exception {
        return jobBuilderFactory.get("readEmployeeJob")
                .incrementer(new RunIdIncrementer())
                .start(EmployeeStepOne())
                .listener(EmployeesJobListener())
                .build();
    }

    @Bean
    public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("fac-thrd-");
        taskExecutor.setConcurrencyLimit(Runtime.getRuntime().availableProcessors());
        taskExecutor.setThreadGroupName("Employees-Thread");
        taskExecutor.setDaemon(false);
        taskExecutor.setThreadPriority(5);
        return taskExecutor;
    }

    @Bean
    public Step EmployeeStepOne() throws Exception {            
        return stepBuilderFactory.get("EmployeeStepOne")
                .partitioner(slaveStep().getName(), EmployeesPartition())
                .step(slaveStep())
                .gridSize(10)
                .taskExecutor(simpleAsyncTaskExecutor())
                .build();
    }

    // slave step
    @Bean
    public Step slaveStep() throws Exception {
        return stepBuilderFactory.get("slaveStep")
                .<EmployeesDTO, EmployeesDTO>chunk(CHUNK_SIZE)
                .reader(EmployeeReader(null, null))
                .writer(EmployeeWriter())
                .build();
    }


    // Readers
    @Bean(destroyMethod = "")
    @StepScope
    public JdbcCursorItemReader<EmployeesDTO> EmployeeReader(
            @Value("#{stepExecutionContext['limit']}") Long limit,
            @Value("#{stepExecutionContext['offset']}") Long offset) throws Exception {

        String sql = "select * from root.Employees c "
                + "full outer join root.Employee_email ce "
                + "on c.Employee_id = ce.Employee_id "
                + "order by c.Employee_id limit " + limit +" offset "+ offset;
        logger.info("Employees SQL = {} ", sql);
        JdbcCursorItemReader<EmployeesDTO> reader = new JdbcCursorItemReader<>();
        reader.setDataSource(this.dataSource);
        reader.setSql(sql);
        reader.setRowMapper(new EmployeeRowMapper());
        reader.afterPropertiesSet();
        return reader;
    }

    // Processors
    @Bean
    public ItemProcessor<EmployeesDTO, EmployeesDTO> EmployeeProcessor() {
        return new EmployeesProcessor();
    }

    // Writers
    @Bean
    public ItemWriter<EmployeesDTO> EmployeeWriter() {
        return new EmployeeWriter();
    }
}

RowMapper.java

public class EmployeeRowMapper implements RowMapper<Employee> {

    @Override
    public Employee mapRow(ResultSet rs, int rowNum) throws SQLException {
        // EmployeeEmail email = new EmployeeEmail();
        ....
        ....
        ....
        ....
        ....
        List<EmployeeEmail> employeeEmails = new ArrayList<>();
        employeeEmails.add(email);

        Employee dto = Employee.builder()
                .businessTitle(rs.getString(""))
                ...........
                ...........
                ...........
                ...........
                ...........
                ...........
                ...........
                .employeeEmails(employeeEmails)
                .build();

        return dto;
    }
}

回答1:

A RowMapper is used to map a single database row to a POJO. So unless each row contains all emails in different columns, for example id,name,email1,email2,email3, what you are trying to do won't work.

If for each item you have 3 rows for emails, you need to make your query return only id,name and use an additional query to grab emails. This additional query can be done either in the mapper itself or in an item processor as described in the driving query pattern.