SCDF - Create Metadata tables in different schema

2020-07-27 03:59发布

问题:

I'm using Spring Batch which loads data from Oracle and put it into the MongoDB. I'm looking to use the Spring Cloud Data Flow, but SCDF doesn't have support for the MongoDB.

Is there any way if we can maintain the SCDF Metadata into the Postgres (as its has good support) or H2, in this example I used H2, and run the actual batch jobs which load data from Oracle to MongoDB.

When I tried to run this combination, I get the below error, certainly some config changes needs to be tweek. Any quick workaround ?

Error:

  org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:152) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader$$FastClassBySpringCGLIB$$ebb633d0.invoke(<generated>) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.batch.item.database.JdbcCursorItemReader$$EnhancerBySpringCGLIB$$3c62d122.open(<generated>) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:399) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:313) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:144) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_171]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_171]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
    at com.sun.proxy.$Proxy66.run(Unknown Source) [na:na]
    at com.mastercard.customer.data.management.refdata.ReferenceMongoBatchApplication.run(ReferenceMongoBatchApplication.java:63) [classes!/:1.0]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:813) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:797) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:324) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
    at com.mastercard.customer.data.management.refdata.ReferenceMongoBatchApplication.main(ReferenceMongoBatchApplication.java:51) [classes!/:1.0]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_171]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_171]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
    at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) [reference-mongo-batch-1.0.jar:1.0]
    at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) [reference-mongo-batch-1.0.jar:1.0]
    at org.springframework.boot.loader.Launcher.launch(Launcher.java:50) [reference-mongo-batch-1.0.jar:1.0]
    at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:51) [reference-mongo-batch-1.0.jar:1.0]

application.properties

# ORACLE DATASOURCE
spring.datasource.url=jdbc:oracle:thin:@//XXXXXX:1527/XXXX
spring.datasource.username=user
spring.datasource.password=password
spring.datasource.driver-class-name=oracle.jdbc.OracleDriver
spring.datasource.validation-query= select 1
#spring.datasource.hikari.validationTimeout=30000

spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=mgdb_drefdata_99

Config

@Configuration
public class EmployeeJob {
    @Value( "${spring.chunk.size}")
    private String chunkSize;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JdbcCursorItemReader<Employee> EmployeeReader;

    @Autowired
    public AsyncItemProcessor<Employee, Employee> asyncItemProcessor;

    @Autowired
    public AsyncItemWriter<Employee> asyncItemWriter;

    @Bean
    public EmployeeStepExecuListner EmployeeStepExecuListner() {
        return new EmployeeStepExecuListner();
    }

    @Bean("readEmployeeJob")
    @Primary
    public Job readEmployeeJob() {
        return jobBuilderFactory.get("readEmployeeJob")
                .incrementer(new RunIdIncrementer())
                .start(EmployeeStepOne())
                .build();
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    @Bean
    public Step EmployeeStepOne() {
        return stepBuilderFactory.get("EmployeeStepOne")
                .<Employee, Employee>chunk(Integer.parseInt(chunkSize))
                .reader(EmployeeReader)
                .processor((ItemProcessor) asyncItemProcessor)
                .writer(asyncItemWriter)
                .build();
    }
}

Config

@Configuration
public class EmployeeBatchConfig {

    @Autowired
    @Qualifier(value="oracleDS")
    private DataSource dataSource;

    @Bean(destroyMethod = "")
    @StepScope
    public JdbcCursorItemReader<Employee> EmployeeReader() throws Exception {
        JdbcCursorItemReader<Employee> reader = new JdbcCursorItemReader<>();
        reader.setDataSource(this.dataSource);
        reader.setSql(SELECT_Employee_SQL);

        reader.setRowMapper(new EmployeeRowMapper());
        reader.afterPropertiesSet();
        return reader;
    }

    @Bean
    public ItemProcessor<Employee, Employee> EmployeeProcessor() {
        return new EmployeeProcessor();
    }

    @Bean
    public AsyncItemProcessor<Employee, Employee> asyncItemProcessor() throws Exception{
        AsyncItemProcessor<Employee, Employee> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(EmployeeProcessor());
        asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
        asyncItemProcessor.afterPropertiesSet();
        return asyncItemProcessor;
    }

    @Bean
    public EmployeeWriter EmployeeWriter() {
        return new EmployeeWriter();
    }


    @Bean
    public AsyncItemWriter<Employee> asyncItemWriter() throws Exception{
        AsyncItemWriter<Employee> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(EmployeeWriter());
        asyncItemWriter.afterPropertiesSet();
        return asyncItemWriter;  
    }
}