Inserting repeated records into Big Query with Jav

2019-07-26 14:23发布

I have data with repeated key-value (String,String) record pairs as one of the fields in a Big Query table schema.

I am trying to add these repeated records using the approach here: http://sookocheff.com/post/bigquery/creating-a-big-query-table-java-api/

The table schema created for the repeated record field looks like this:

TableFieldSchema column = new TableFieldSchema().setName("rawFields");
column.setType("RECORD");
List<TableFieldSchema> list = new ArrayList<>();
list.add(new TableFieldSchema().setName("key").setType("STRING"));
list.add(new TableFieldSchema().setName("value").setType("STRING"));
column.setFields(list);
column.setMode("REPEATED");

I am inserting data like this as part of a DoFn:

Map<String,String> record = ... // key-value pairs
List<TableRow> rawFields = new ArrayList<>();
record.forEach((k,v)->
    rawFields.add(new TableRow().set("key",k).set("value", v))
);
TableRow row = new TableRow();
// row has other fields, omitted here
row.set("rawFields", rawFields);

The DoFn is in my dataflow pipeline just before the BigQueryIO.Write:

.apply(BigQueryIO.Write
        .named("WriteLBLogLines")
        .to("xxx:yyy.zzz")
        .withSchema(mySchema)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

When I try and run this through Dataflow I get the following error:

errorResult: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON parsing error in row starting at position 0 at file: gs://xxxxxxxxxx/12271006010671464167/dax-tmp-2016-06-28_14_47_26-12271006010671462904-S04-1-303c4f638f6b411b/-shard-00002-of-00003-try-021aff4c448b3177-endshard.json. Repeated field must be imported as a JSON array. Field: rawFields.

What is wrong with my approach? It seems I am not using the right approach for inserting repeated records.

1条回答
走好不送
2楼-- · 2019-07-26 14:45

I've attempted to reproduce the problem with the following code, but it executes successfully. Are there other aspects of the schema that could be at issue?

List<TableFieldSchema> fields = new ArrayList<>();
TableFieldSchema column = new TableFieldSchema().setName("rawFields");
column.setType("RECORD");
List<TableFieldSchema> list = new ArrayList<>();
list.add(new TableFieldSchema().setName("key").setType("STRING"));
list.add(new TableFieldSchema().setName("value").setType("STRING"));
column.setFields(list);
column.setMode("REPEATED");
fields.add(column);
TableSchema schema = new TableSchema().setFields(fields);

TableRow row = new TableRow();
List<TableRow> rawFields = new ArrayList<>();
rawFields.add(new TableRow().set("key","foo").set("value", "bar"));
row.set("rawFields", rawFields);

Pipeline p = Pipeline.create(options);
PCollection<TableRow> c =
    p.apply(Create.of(row, row).withCoder(TableRowJsonCoder.of()));
c.apply(BigQueryIO.Write.named("BigQuery-Write")
        .to(options.getOutput())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
        .withSchema(schema));
p.run();
查看更多
登录 后发表回答