I am using Spring Batch to read data from Postgres
and write it into the MongoDB
. In my case, Employee
has 3 different types of email address 1) Home Address 2) Office Address 3) Social Address from Employee Email Table.
Since we've almost 10 lacs
employees in DB, hence using Custom Partitions to pull the data from Postgres
and making the join with employee_email (later employee_phone as well) so that in Processor will have a mapping for Mongo POJO and save into MongoDB.
Now the issue is I need to embed Employee email record into the Contact as a array, but with the current logic its saving as a separate collection
How can we solved this issue ?
select * from root.employees c
full outer join root.employee_email ce
on c.employee_id = ce.employee_id
order by c.employee_id limit 1000 offset 0;
Now when data saved into DB, only email is getting saved and appears that other 2 are getting override.
How would I need to handle that, looks like EmployeeRowMapper is overriding all other email address. How will I solve this issue ?
public class EmployeeJob {
private static Logger logger = LoggerFactory.getLogger(EmployeeJob.class);
private static final Integer CHUNK_SIZE = 1000;
private DataSource dataSource;
private JobBuilderFactory jobBuilderFactory;
private StepBuilderFactory stepBuilderFactory;
public EmployeesPartitions EmployeesPartition() {
return new EmployeesPartitions();
public EmployeesJobListener EmployeesJobListener() {
return new EmployeesJobListener();
public Job readEmployeeJob() throws Exception {
return jobBuilderFactory.get("readEmployeeJob")
.incrementer(new RunIdIncrementer())
public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("fac-thrd-");
return taskExecutor;
public Step EmployeeStepOne() throws Exception {
return stepBuilderFactory.get("EmployeeStepOne")
.partitioner(slaveStep().getName(), EmployeesPartition())
// slave step
public Step slaveStep() throws Exception {
return stepBuilderFactory.get("slaveStep")
.<EmployeesDTO, EmployeesDTO>chunk(CHUNK_SIZE)
.reader(EmployeeReader(null, null))
// Readers
@Bean(destroyMethod = "")
public JdbcCursorItemReader<EmployeesDTO> EmployeeReader(
@Value("#{stepExecutionContext['limit']}") Long limit,
@Value("#{stepExecutionContext['offset']}") Long offset) throws Exception {
String sql = "select * from root.Employees c "
+ "full outer join root.Employee_email ce "
+ "on c.Employee_id = ce.Employee_id "
+ "order by c.Employee_id limit " + limit +" offset "+ offset;
logger.info("Employees SQL = {} ", sql);
JdbcCursorItemReader<EmployeesDTO> reader = new JdbcCursorItemReader<>();
reader.setRowMapper(new EmployeeRowMapper());
return reader;
// Processors
public ItemProcessor<EmployeesDTO, EmployeesDTO> EmployeeProcessor() {
return new EmployeesProcessor();
// Writers
public ItemWriter<EmployeesDTO> EmployeeWriter() {
return new EmployeeWriter();
public class EmployeeRowMapper implements RowMapper<Employee> {
public Employee mapRow(ResultSet rs, int rowNum) throws SQLException {
// EmployeeEmail email = new EmployeeEmail();
List<EmployeeEmail> employeeEmails = new ArrayList<>();
Employee dto = Employee.builder()
return dto;