A simple counting step following a group by key is

2019-04-15 09:01发布

I have a DataFlow pipeline trying to build an index (key-value pairs) and compute some metrics (like a number of values per key). The input data is about 60 GB total, stored on GCS and the pipeline has about 126 workers allocated. Per Stackdriver all workers have about 6% CPU utilization.

The pipeline seems to make no progress despite having 126 workers and based on the wall time the bottleneck seems to be a simple counting step that follows a group by. While all other steps have on average less than 1 hour spent in them, the counting step took already 50 days of the wall time. There seems to be no helpful information all warnings in the log.

The counting step was implemented following a corresponding step in the WordCount example:

def count_keywords_per_product(self, key_and_group):
    key, group = key_and_group
    count = 0
    for e in group:
        count += 1

    self.stats.product_counter.inc()
    self.stats.keywords_per_product_dist.update(count)

    return (key, count)

The preceding step "Group keywords" is a simple beam.GroupByKey() transformation.

Please advise what might be the reason and how this can be optimized.

Current resource metrics:
Current vCPUs    126
Total vCPU time      1,753.649 vCPU hr
Current memory   472.5 GB
Total memory time    6,576.186 GB hr
Current PD   3.08 TB
Total PD time    43,841.241 GB hr
Current SSD PD   0 B
Total SSD PD time    0 GB hr
Total Shuffle data processed     1.03 TB
Billable Shuffle data processed      529.1 GB

The pipeline steps including the counting one can be seen below: enter image description here

1条回答
Bombasti
2楼-- · 2019-04-15 09:34

The best way of having a sum per key here is to use a combine operation. The reason is that it can alleviate the problem of having hot keys.

Try replacing your GroupByKey + ParDo with a beam.combiners.Count.PerKey, or a similar combine transform that suits your use case.

查看更多
登录 后发表回答