Strategy for loading data into BigQuery and Google

2019-07-27 11:30发布

I have 2 years of combined data of size around 300GB in my local disk which i have extracted from teradata. I have to load the same data to both google cloud storage and BigQuery table.

The final data in google cloud storage should be day wise segregated in compressed format(each day file should be a single file in gz format). I also have to load the data in BigQuery in a day wise partitioned table i.e. each day's data should be stored in one partition.

I loaded the combined data of 2 years to google storage first. Then tried using google dataflow to day wise segregate data by using the concept of partitioning in dataflow and load it to google cloud storage (FYI dataflow partitioning is different from bigquery partitioning). But dataflow did not allow to create 730 partitions(for 2 years) as it hit the 413 Request Entity Too Large (The size of serialized JSON representation of the pipeline exceeds the allowable limit").

So I ran the dataflow job twice which filtered data for each year. It filtered each one year's data and wrote it into separate files in google cloud storage but it could not compress it as dataflow currently cannot write to compressed files.

Seeing the first approach fail, I thought of filtering 1 the one year's data from the combined data using partioning in dataflow as explained above and writing it directly to BigQuery and then exporting it to google storage in compressed format. This process would have been repeated twice. But in this approach i could not write more than 45 days data at once as I repeatedly hit java.lang.OutOfMemoryError: Java heap space issue. So this startegy also failed

Any help in figuring out a strategy for date wise segregated migration to google storage in compressed format and BigQuery would be of great help?

2条回答
劫难
2楼-- · 2019-07-27 11:53

Let's see if this will help?

Steps + pseudo code

1 - Upload combined data (300GB) to BigQuery to CombinedData table

2 - Split Years (Cost 1x2x300GB = 600GB)

SELECT * FROM CombinedData WHERE year = year1 -> write to DataY1 table  
SELECT * FROM CombinedData WHERE year = year2 -> write to DataY2 table  

3 - Split to 6 months (Cost 2x2x150GB = 600GB)

SELECT * FROM DataY1 WHERE month in (1,2,3,4,5,6) -> write to DataY1H1 table
SELECT * FROM DataY1 WHERE month in (7,8,9,10,11,12) -> write to DataY1H2 table
SELECT * FROM DataY2 WHERE month in (1,2,3,4,5,6) -> write to DataY2H1 table
SELECT * FROM DataY2 WHERE month in (7,8,9,10,11,12) -> write to DataY2H2 table

4 - Split to 3 months (Cost 4x2x75GB = 600GB)

SELECT * FROM DataY1H1 WHERE month in (1,2,3) -> write to DataY1Q1 table
SELECT * FROM DataY1H1 WHERE month in (4,5,6) -> write to DataY1Q2 table
SELECT * FROM DataY1H2 WHERE month in (7,8,9) -> write to DataY1Q3 table
SELECT * FROM DataY1H2 WHERE month in (10,11,12) -> write to DataY1Q4 table

SELECT * FROM DataY2H1 WHERE month in (1,2,3) -> write to DataY2Q1 table
SELECT * FROM DataY2H1 WHERE month in (4,5,6) -> write to DataY2Q2 table
SELECT * FROM DataY2H2 WHERE month in (7,8,9) -> write to DataY2Q3 table
SELECT * FROM DataY2H2 WHERE month in (10,11,12) -> write to DataY2Q4 table

5 - Split each quarter into 1 and 2 months (Cost 8x2x37.5GB = 600GB)

SELECT * FROM DataY1Q1 WHERE month = 1 -> write to DataY1M01 table
SELECT * FROM DataY1Q1 WHERE month in (2,3) -> write to DataY1M02-03 table
SELECT * FROM DataY1Q2 WHERE month = 4 -> write to DataY1M04 table
SELECT * FROM DataY1Q2 WHERE month in (5,6) -> write to DataY1M05-06 table  

Same for rest of Y(1/2)Q(1-4) tables

6 - Split all double months tables into separate month table (Cost 8x2x25GB = 400GB)

SELECT * FROM DataY1M002-03 WHERE month = 2 -> write to DataY1M02 table
SELECT * FROM DataY1M002-03 WHERE month = 3 -> write to DataY1M03 table
SELECT * FROM DataY1M005-06 WHERE month = 5 -> write to DataY1M05 table
SELECT * FROM DataY1M005-06 WHERE month = 6 -> write to DataY1M06 table

Same for the rest of Y(1/2)M(XX-YY) tables

7 - Finally you have 24 monthly tables and now I hope limitations you are facing will be gone so you can proceed with your plan – second approach let’s say - to further split on daily tables

I think, cost wise this is most optimal approach and final querying cost is
(assuming billing tier 1)

4x600GB + 400GB = 2800GB = $14 

Of course don’t forget delete intermediate tables

Note: I am not happy with this plan - but if splitting your original file to daily chunks outside of BigQuery is not an option - this can help

查看更多
男人必须洒脱
3楼-- · 2019-07-27 11:54

Currently, partitioning the results is the best way to produce multiple output files/tables. What you're likely running into is the fact that each write allocates a buffer for the uploads, so if you have a partition followed by N writes, there are N buffers.

There are two strategies for making this work.

  1. You can reduce the size of the upload buffers using the uploadBufferSizeBytes option in GcsOptions. Note that this may slow down the uploads since the buffers will need to be flushed more frequently.
  2. You can apply a Reshuffle operation to each PCollection after the partition. This will limit the number of concurrent BigQuery sinks running simultaneously, so fewer buffers will be allocated.

For example, you could do something like:

PCollection<Data> allData = ...;
PCollectionList<Data> partitions = allData.apply(Partition.of(...));

// Assuming the partitioning function has produced numDays partitions,
// and those can be mapped back to the day in some meaningful way:
for (int i = 0; i < numDays; i++) {
  String outputName = nameFor(i); // compute the output name
  partitions.get(i)
    .apply("Write_" + outputName, ReshuffleAndWrite(outputName));
}

That makes use of these two helper PTransforms:

private static class Reshuffle<T>
  extends PTransform<PCollection<T>, PCollection<T>> {
  @Override
  public PCollection<T> apply(PCollection<T> in) {
    return in
      .apply("Random Key", WithKeys.of(
          new SerializableFunction<T, Integer>() {
            @Override
            public Integer apply(Data value) {
              return ThreadLocalRandom.current().nextInt();
            }
          }))
      .apply("Shuffle", GroupByKey.<Integer, T>create())
      .apply("Remove Key", Values.create());
  }
}

private static class ReshuffleAndWrite 
  extends PTransform<PCollection<Data>, PDone> {

  private final String outputName;
  public ReshuffleAndWrite(String outputName) {
    this.outputName = outputName;
  }

  @Override
  public PDone apply(PCollection<Data> in) {
    return in
      .apply("Reshuffle", new Reshuffle<Data>())
      .apply("Write", BigQueryIO.Write.to(tableNameFor(outputName)
        .withSchema(schema)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
  }
}
查看更多
登录 后发表回答