We're generating a sequential index in a ParDo using Beam's Java SDK 2.0.0. Just like the simple stateful index example in Beam's introduction to stateful processing we use a ValueState<Integer>
cell and our only operation on it is to retrieve the value and increment when we need the next index:
Integer statefulIndex = firstNonNull(index.read(), 0);
index.write(statefulIndex + 1);
When running with Google's Dataflow runner, we noticed on the Dataflow monitoring interface that the wall time for that ParDo was accumulating in sync with elapsed time. We were able to confirm that the ParDo executes single-threaded by ssh'ing in to the worker node and using top
and 1
to view the CPU usage per core. Commenting out the stateful processing cell and keeping the code otherwise unchanged, the same ParDo uses all cores of our n1-standard-32
worker node.
Even if the Dataflow runner is able to parallelize stateful indexing based on each key and window pair (we currently have one window and one key), the lack of parallelism causes such a significant decrease in performance that we are unable to use it. Is this the expected behavior of the Dataflow runner?
Naively, I expected that behind the scenes, Beam's stateful indexing would operate similarly to Java's AtomicInteger
. Are there constraints that prevent parallel processing with a ValueState<Integer>
cell or is this functionality just not yet built into the runner?