JDBC从梁甲骨文取(JDBC Fetch from oracle with Beam)

2019-09-28 11:29发布

下面的程序是连接到Oracle 11g和获取记录。 它给如何过我NullPointerException异常的编码器在pipeline.apply()。

我已经加入了ojdbc14.jar复制到项目的依赖。

public static void main(String[] args) {

        Pipeline p = Pipeline.create(PipelineOptionsFactory.create());      
         p.apply(JdbcIO.<KV<Integer, String>>read()
                   .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                          "oracle.jdbc.driver.OracleDriver", "jdbc:oracle:thin:@hostdnsname:port/servicename")
                   .withUsername("uname")
                   .withPassword("pwd"))
                   .withQuery("select EMPID,NAME from EMPLOYEE1")
                   .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
                     public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
                       return KV.of(resultSet.getInt(1), resultSet.getString(2));
                     }
                   }));
         p.run();

    }

是给下面的error.Any线索?

Exception in thread "main" java.lang.NullPointerException: coder
    at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:228)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.validate(JdbcIO.java:283)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.validate(JdbcIO.java:216)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:399)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:307)
    at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:47)
    at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:158)
    at org.apache.beam.examples.v030.JdbcUtil.main(JdbcUtil.java:21)

Answer 1:

嗨,您好!

遗憾的错误消息不是非常有帮助的,但实际上它是一个验证步骤。 我已经申请BEAM-959 ,以改善这一点。

您需要提供一个编码器,例如通过

.withCoder(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of())`

我已经申请BEAM-960 ,以改善该编码器的自动化,就像我们在梁大部等地。



Answer 2:

尝试这个。

 pipeline.apply(( JdbcIO.<KV<Integer, String>>read().withCoder(KvCoder.of(VarIntCoder.of(),StringUtf8Coder.of())) 
               .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                      "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/deepakgoyal")
                    .withUsername("root")
                    .withPassword("root"))
               .withQuery("select empid, name from employee")

               .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
                 public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
                   return KV.of(resultSet.getInt(1), resultSet.getString(2));
                 }
               })
             ))

而且不要忘了添加MySQL连接器JAR在你的项目。 提前致谢。



文章来源: JDBC Fetch from oracle with Beam