Partitioning a table

2019-03-05 19:26发布

Bigquery allow partitioning, only by date, at this time.

Lets supose I have a 1billion table rows with inserted_timestamp field. Lets supose this field has dates from 1 year ago.

What is the right way to move existing data to a new partitioned table?

Edited

I saw there was a elegant solution on Java with version < 2.0 Sharding BigQuery output tables also elaborated at BigQuery partitioning with Beam streams that is to parametrize table name ( or partition suffix ) windowing data.

But I miss BigQueryIO.Write on 2.x beam project also there is no samples about get window time from python serializable function.

I tried to make partitions on pipe but if fails with a large number of partitions ( runs with 100 but fails with 1000 ).

This is my code as far as I could:

               (  p
                | 'lectura' >> beam.io.ReadFromText(input_table)
                | 'noheaders' >> beam.Filter(lambda s: s[0].isdigit())
                | 'addtimestamp' >> beam.ParDo(AddTimestampDoFn())
                | 'window' >> beam.WindowInto(beam.window.FixedWindows(60))
                | 'table2row'  >> beam.Map( to_table_row )  
                | 'write2table' >> beam.io.Write(beam.io.BigQuerySink(
                        output_table,   #<-- unable to parametrize by window
                        dataset=my_dataset, 
                        project=project, 
                        schema='dia:DATE, classe:STRING, cp:STRING, import:FLOAT',
                        create_disposition=CREATE_IF_NEEDED,
                        write_disposition=WRITE_TRUNCATE,
                                    )
                                )
                )

p.run()

1条回答
相关推荐>>
2楼-- · 2019-03-05 19:57

All of the functionality necessary to do this exists in Beam, although it may currently be limited to the Java SDK.

You would use BigQueryIO. Specifically, you may use DynamicDestinations to determine a destination table for each row.

From the example of DynamicDestinations:

events.apply(BigQueryIO.<UserEvent>write()
  .to(new DynamicDestinations<UserEvent, String>() {
        public String getDestination(ValueInSingleWindow<String> element) {
          return element.getValue().getUserId();
        }
        public TableDestination getTable(String user) {
          return new TableDestination(tableForUser(user), 
            "Table for user " + user);
        }
        public TableSchema getSchema(String user) {
          return tableSchemaForUser(user);
        }
      })
  .withFormatFunction(new SerializableFunction<UserEvent, TableRow>() {
     public TableRow apply(UserEvent event) {
       return convertUserEventToTableRow(event);
     }
   }));
查看更多
登录 后发表回答