I noticed that in my jobs the throughput (reported number of records/sec) slows down significantly after a "group by" step. When that workflow step executes, I see that some instances have CPU utilization of ~30%, while some seem to be idle.
Is it a dataflow issue or should I somehow instruct the workflow to increase the parallelism of this step?
Thanks, G
The low throughput could also be a result of "hot-keys," or very frequently occurring keys. This will result in a few extremely large collections that are processed by a single core on a single worker.
Here is Google's official documentation regarding hot keys and how to deal with them. In my experience, selectively applying a fanout factor using Combine.PerKeyWithHotKeyFanout has yielded good results.
Its hard to know for sure what's happening without knowing more specifics about what your pipeline is doing.
In general throughput (number of records/sec) depends on several factors such as
In general a GroupByKey constructs a larger record consisting of a key and all values with that key; i.e. the input is a collection of KV<K,V> and the output is a collection of KV<K, Iterable<V>>
As a result, in general I'd expect the records outputted by a GroupByKey are much larger then the input records. Since the records are larger they take longer to process so records/sec would tend to decrease.
The low CPU utilization is not unexpected in the Alpha release of Dataflow. Right now, Dataflow is not fully taking advantage of all a VMs cores to process work. A number of performance improvements are coming to improve this.
Dataflow currently provides two knobs for tuning the amount of parallelism via the flags
--numWorkers allows you to increase or decrease the number of workers used to process your data in parallel. In general, increasing the number of workers allows more data to be processed in parallel.
Using --workerMachineType you can pick a machine with more or less CPU or RAM.
If you notice your VM's CPU being underutilized you can pick a machine with fewer CPUs (by default Dataflow uses 4 CPUs per VM). If you reduce the CPUs per machine but increase numWorkers so that total number of CPUs is about the same, you might be able to increase the amount of parallelism without increasing the cost of your job.
Right now Dataflow only gives these very coarse knobs for controlling the amount of parallelism on a global level (as opposed to per stage level). This might change in the future. However, in general our goal is to automatically tune the amount of parallelism for you, so you don't have to.