我有工作在数据流从Bigtable的数据导入到BigQuery的使用内置的数据流的API两种。 我有两个问题:
问题1:如果源数据是在一个大表Bigtable中,我怎么能它分为BigQuery中的一组子或小表的动态基础上,也就是说,只有在运行时知道给定的Bigtable行键?
在数据流的Java代码如下所示:
p.apply(Read.from(CloudBigtableIO.read(config)))
.apply(ParDo.of(new SomeDoFNonBTSourceData()))
.apply(BigQueryIO.Write
.to(PROJ_ID + ":" + BQ_DataSet + "." + BQ_TableName)
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
p.run();
所以,既然BQ_TableName
有在代码层面上提供,我怎么能为其提供编程基础上所看到里面SomeDoFNonBTSourceData
,就像一个范围目前RowKey的价值观? 如果RowKey是“交流”,那么TableA的,如果“东风”,那么表B,等等。
问题2:什么是Bigtable的时间戳导出至BigQuery,以便在BigQuery中的人类可读的格式,最终重建正确的方式?
在DOFN内的processElement功能如下:
public void processElement(ProcessContext c)
{
String valA = new String(c.element().getColumnLatestCell(COL_FAM, COL_NAME).getValueArray());
Long timeStamp = c.element().getColumnLatestCell(COL_FAM, COL_NAME).getTimestamp();
tr.put("ColA", valA);
tr.put("TimeStamp",timeStamp);
c.output(tr);
}
与管道建设过程中,BQ模式设置为timestamp列如下:
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("ColA").setType("STRING"));
fields.add(new TableFieldSchema().setName("TimeStamp").setType("TIMESTAMP"));
schema = new TableSchema().setFields(fields);
因此Bigtable的时间戳似乎型的Long
,我曾经尝试都"TIMESTAMP"
和"INTEGER"
类型的BQ目的地时间戳列(好像有在BQ不只要这样)。 最后,我需要“通过顺序”子句BQ既使用时间戳列,并显示在人类可读的形式(日期和时间)的信息。 “订单由”部分似乎工作确定,但我还没有设法最终结果浇铸成任何有意义的 - 要么得到铸错误或东西仍然无法读取。