I have created a dataflow which takes input from datastore and performs transform to convert it to BigQuery TableRow. I am attaching timestamp with each element in a transform. Then window of one day is applied to the PCollection. The windowed output is written to a partition in BigQuery table using Apache Beam's BigQueryIO.
Before writing to BigQuery, it uses reshuffle via random key as an intermediate step to avoid fusion.
The pipeline behaviour is :
1. For 2.8 million entities in the input: Total vCPU time- 5.148 vCPU hr Time to complete job- 53 min 9 sec Current workers- 27 Target workers- 27 Job ID: 2018-04-04_04_20_34-1951473901769814139 2. For 7 million entites in the input: Total vCPU time- 247.772 vCPU hr Time to complete the job- 3 hr 45 min Current workers- 69 Target workers- 1000 Job ID: 2018-04-02_21_59_47-8636729278179820259
I couldn't understand why it takes so much time to finish the job and CPU hours for the second case.
The dataflow pipeline at a high level is :
// Read from datastore
PCollection<Entity> entities =
pipeline.apply("ReadFromDatastore",
DatastoreIO.v1().read().withProjectId(options.getProject())
.withQuery(query).withNamespace(options.getNamespace()));
// Apply processing to convert it to BigQuery TableRow
PCollection<TableRow> tableRow =
entities.apply("ConvertToTableRow", ParDo.of(new ProcessEntityFn()));
// Apply timestamp to TableRow element, and then apply windowing of one day on that
PCollection<TableRow> tableRowWindowTemp =
tableRow.apply("tableAddTimestamp", ParDo.of(new ApplyTimestampFn())).apply(
"tableApplyWindow",
Window.<TableRow> into(CalendarWindows.days(1).withTimeZone(
DateTimeZone.forID(options.getTimeZone()))));
//Apply reshuffle with random key for avoiding fusion
PCollection<TableRow> ismTableRowWindow =
tableRowWindow.apply("ReshuffleViaRandomKey",
Reshuffle.<TableRow> viaRandomKey());
// Write windowed output to BigQuery partitions
tableRowWindow.apply(
"WriteTableToBQ",
BigQueryIO
.writeTableRows()
.withSchema(BigqueryHelper.getSchema())
.to(TableRefPartition.perDay(options.getProject(),
options.getBigQueryDataset(), options.getTableName()))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
I saw you posted a similar question here, and now you have added to your code the step:
As someone already told you in the other question:
So in this case it looks like it's still happening something similar(you have further information about Hot Key problems here:
If this is the case and there is some worker stuck, then amounts of entities vs. time to complete the job doesn't have to follow any linearity. And the vCPU consumption should be more a matter of optimizing the code to avoid the hot key issue.