Apache beam : Programatically create partitioned t

2019-08-19 00:23发布

I am writing a cloud dataflow that reads messages from Pubsub and stores those into BigQuery. I want to use partitioned table (by date) and I am using Timestamp associated with message to determine which partition the message should go into. Below is my code:

      BigQueryIO.writeTableRows()
        .to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
            private static final long serialVersionUID = 1L;

            @Override
              public TableDestination apply(ValueInSingleWindow<TableRow> value) {
                log.info("Row value : {}", value.getValue());
                Instant timestamp = value.getTimestamp();
                String partition = DateTimeFormat.forPattern("yyyyMMdd").print(timestamp);
                TableDestination td = new TableDestination(
                    "<project>:<dataset>.<table>" + "$" + partition, null);
                log.info("Table Destination : {}", td);
                return td;
              }
          })            
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)         
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) 
    .withSchema(tableSchema);

When I deploy the dataflow, I can see the log statements in Stackdriver, however, the messages do not get inserted into BigQuery tables and I get the following error:

Request failed with code 400, will NOT retry: https://www.googleapis.com/bigquery/v2/projects/<project_id>/datasets/<dataset_id>/tables
severity:  "WARNING"  

So, it looks like it is not able to create a table, resulting in insert failure. Do I need to change the dataflow definition in order to make this work? If not, is there any other way to create the partitioned tables programmatically?

I am using Apache beam 2.0.0.

1条回答
走好不送
2楼-- · 2019-08-19 01:24

This was a bug in BigQueryIO and it has been fixed in Beam 2.2. You can use a snapshot version of Beam, or wait until release 2.2 is finalized (the release process is currently in progress).

查看更多
登录 后发表回答