Stateful indexing causes ParDo to be run single-th

2019-02-26 06:54发布

问题:

We're generating a sequential index in a ParDo using Beam's Java SDK 2.0.0. Just like the simple stateful index example in Beam's introduction to stateful processing we use a ValueState<Integer> cell and our only operation on it is to retrieve the value and increment when we need the next index:

Integer statefulIndex = firstNonNull(index.read(), 0);
index.write(statefulIndex + 1);

When running with Google's Dataflow runner, we noticed on the Dataflow monitoring interface that the wall time for that ParDo was accumulating in sync with elapsed time. We were able to confirm that the ParDo executes single-threaded by ssh'ing in to the worker node and using top and 1 to view the CPU usage per core. Commenting out the stateful processing cell and keeping the code otherwise unchanged, the same ParDo uses all cores of our n1-standard-32 worker node.

Even if the Dataflow runner is able to parallelize stateful indexing based on each key and window pair (we currently have one window and one key), the lack of parallelism causes such a significant decrease in performance that we are unable to use it. Is this the expected behavior of the Dataflow runner?

Naively, I expected that behind the scenes, Beam's stateful indexing would operate similarly to Java's AtomicInteger. Are there constraints that prevent parallel processing with a ValueState<Integer> cell or is this functionality just not yet built into the runner?

回答1:

This is not only the expected behavior of the Dataflow runner, but a logical necessity in any context. It doesn't matter if you are using state in Beam or an AtomicInteger in a single-process Java program: if operation "A" writes a value and operation "B" reads the value, then "B" must be executed after "A". The common term for this is relationship is "happens-before".

This form of stateful computation is the opposite of parallel computation. By definition, a read that observes a write has a causal relationship. By definition, two operations that are in parallel do not have a causal relationship.

Now, you are perhaps expecting parallel threads that access the state cell concurrently, as in the standard pattern of multi-threaded programming with some shared state with concurrency control. For this example, if these threads were actually parallel, you would get duplicate indices. Taking a step back, Beam targets massive "embarrassingly parallel" computations transparently distributed across a large cluster of machines. Fine-grained concurrency controls, aside from being extremely difficult to get right, do not readily translate to massive distributed computations.