I have a DataFlow pipeline trying to build an index (key-value pairs) and compute some metrics (like a number of values per key). The input data is about 60 GB total, stored on GCS and the pipeline has about 126 workers allocated. Per Stackdriver all workers have about 6% CPU utilization.
The pipeline seems to make no progress despite having 126 workers and based on the wall time the bottleneck seems to be a simple counting step that follows a group by. While all other steps have on average less than 1 hour spent in them, the counting step took already 50 days of the wall time. There seems to be no helpful information all warnings in the log.
The counting step was implemented following a corresponding step in the WordCount example:
def count_keywords_per_product(self, key_and_group):
key, group = key_and_group
count = 0
for e in group:
count += 1
self.stats.product_counter.inc()
self.stats.keywords_per_product_dist.update(count)
return (key, count)
The preceding step "Group keywords" is a simple beam.GroupByKey() transformation.
Please advise what might be the reason and how this can be optimized.
Current resource metrics:
Current vCPUs 126
Total vCPU time 1,753.649 vCPU hr
Current memory 472.5 GB
Total memory time 6,576.186 GB hr
Current PD 3.08 TB
Total PD time 43,841.241 GB hr
Current SSD PD 0 B
Total SSD PD time 0 GB hr
Total Shuffle data processed 1.03 TB
Billable Shuffle data processed 529.1 GB
The pipeline steps including the counting one can be seen below:
The best way of having a sum per key here is to use a combine operation. The reason is that it can alleviate the problem of having hot keys.
Try replacing your
GroupByKey + ParDo
with abeam.combiners.Count.PerKey
, or a similar combine transform that suits your use case.