I am writing a cloud dataflow that reads messages from Pubsub and stores those into BigQuery. I want to use partitioned table (by date) and I am using Timestamp
associated with message to determine which partition the message should go into. Below is my code:
BigQueryIO.writeTableRows()
.to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
private static final long serialVersionUID = 1L;
@Override
public TableDestination apply(ValueInSingleWindow<TableRow> value) {
log.info("Row value : {}", value.getValue());
Instant timestamp = value.getTimestamp();
String partition = DateTimeFormat.forPattern("yyyyMMdd").print(timestamp);
TableDestination td = new TableDestination(
"<project>:<dataset>.<table>" + "$" + partition, null);
log.info("Table Destination : {}", td);
return td;
}
})
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withSchema(tableSchema);
When I deploy the dataflow, I can see the log statements in Stackdriver, however, the messages do not get inserted into BigQuery tables and I get the following error:
Request failed with code 400, will NOT retry: https://www.googleapis.com/bigquery/v2/projects/<project_id>/datasets/<dataset_id>/tables
severity: "WARNING"
So, it looks like it is not able to create a table, resulting in insert failure. Do I need to change the dataflow definition in order to make this work? If not, is there any other way to create the partitioned tables programmatically?
I am using Apache beam 2.0.0.