I'm using Spring Batch
which loads data from Oracle
and put it into the MongoDB
. I'm looking to use the Spring Cloud Data Flow, but SCDF
doesn't have support for the MongoDB
.
Is there any way if we can maintain the SCDF
Metadata into the Postgres
(as its has good support) or H2, in this example I used H2, and run the actual batch jobs which load data from Oracle to MongoDB
.
When I tried to run this combination, I get the below error, certainly some config changes needs to be tweek. Any quick workaround ?
Error:
org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:152) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader$$FastClassBySpringCGLIB$$ebb633d0.invoke(<generated>) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.batch.item.database.JdbcCursorItemReader$$EnhancerBySpringCGLIB$$3c62d122.open(<generated>) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:399) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:313) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:144) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_171]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_171]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at com.sun.proxy.$Proxy66.run(Unknown Source) [na:na]
at com.mastercard.customer.data.management.refdata.ReferenceMongoBatchApplication.run(ReferenceMongoBatchApplication.java:63) [classes!/:1.0]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:813) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:797) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:324) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
at com.mastercard.customer.data.management.refdata.ReferenceMongoBatchApplication.main(ReferenceMongoBatchApplication.java:51) [classes!/:1.0]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_171]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_171]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) [reference-mongo-batch-1.0.jar:1.0]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) [reference-mongo-batch-1.0.jar:1.0]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:50) [reference-mongo-batch-1.0.jar:1.0]
at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:51) [reference-mongo-batch-1.0.jar:1.0]
application.properties
# ORACLE DATASOURCE
spring.datasource.url=jdbc:oracle:thin:@//XXXXXX:1527/XXXX
spring.datasource.username=user
spring.datasource.password=password
spring.datasource.driver-class-name=oracle.jdbc.OracleDriver
spring.datasource.validation-query= select 1
#spring.datasource.hikari.validationTimeout=30000
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=mgdb_drefdata_99
Config
@Configuration
public class EmployeeJob {
@Value( "${spring.chunk.size}")
private String chunkSize;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JdbcCursorItemReader<Employee> EmployeeReader;
@Autowired
public AsyncItemProcessor<Employee, Employee> asyncItemProcessor;
@Autowired
public AsyncItemWriter<Employee> asyncItemWriter;
@Bean
public EmployeeStepExecuListner EmployeeStepExecuListner() {
return new EmployeeStepExecuListner();
}
@Bean("readEmployeeJob")
@Primary
public Job readEmployeeJob() {
return jobBuilderFactory.get("readEmployeeJob")
.incrementer(new RunIdIncrementer())
.start(EmployeeStepOne())
.build();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Bean
public Step EmployeeStepOne() {
return stepBuilderFactory.get("EmployeeStepOne")
.<Employee, Employee>chunk(Integer.parseInt(chunkSize))
.reader(EmployeeReader)
.processor((ItemProcessor) asyncItemProcessor)
.writer(asyncItemWriter)
.build();
}
}
Config
@Configuration
public class EmployeeBatchConfig {
@Autowired
@Qualifier(value="oracleDS")
private DataSource dataSource;
@Bean(destroyMethod = "")
@StepScope
public JdbcCursorItemReader<Employee> EmployeeReader() throws Exception {
JdbcCursorItemReader<Employee> reader = new JdbcCursorItemReader<>();
reader.setDataSource(this.dataSource);
reader.setSql(SELECT_Employee_SQL);
reader.setRowMapper(new EmployeeRowMapper());
reader.afterPropertiesSet();
return reader;
}
@Bean
public ItemProcessor<Employee, Employee> EmployeeProcessor() {
return new EmployeeProcessor();
}
@Bean
public AsyncItemProcessor<Employee, Employee> asyncItemProcessor() throws Exception{
AsyncItemProcessor<Employee, Employee> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(EmployeeProcessor());
asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
asyncItemProcessor.afterPropertiesSet();
return asyncItemProcessor;
}
@Bean
public EmployeeWriter EmployeeWriter() {
return new EmployeeWriter();
}
@Bean
public AsyncItemWriter<Employee> asyncItemWriter() throws Exception{
AsyncItemWriter<Employee> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(EmployeeWriter());
asyncItemWriter.afterPropertiesSet();
return asyncItemWriter;
}
}