Remove duplicates across window triggers/firings

2020-02-13 04:34发布

问题:

Let's say I have an unbounded pcollection of sentences keyed by userid, and I want a constantly updated value for whether the user is annoying, we can calculate whether a user is annoying by passing all of the sentences they've ever said into the funcion isAnnoying(). Forever.

I set the window to global with a trigger afterElement(1), accumulatingFiredPanes(), do GroupByKey, then have a ParDo that emits userid,isAnnoying

That works forever, keeps accumulating the state for each user etc. Except it turns out the vast majority of the time a new sentence does not change whether a user isAnnoying, and so most of the times the window fires and emits a userid,isAnnoying tuple it's a redundant update and the io was unnecessary. How do I catch these duplicate updates and drop while still getting an update every time a sentence comes in that does change the isAnnoying value?

回答1:

Today there is no way to directly express "output only when the combined result has changed".

One approach that you may be able to apply to reduce data volume, depending on your pipeline: Use .discardingFiredPanes() and then follow the GroupByKey with an immediate filter that drops any zero values, where "zero" means the identity element of your CombineFn. I'm using the fact that associativity requirements of Combine mean you must be able to independently calculate the incremental "annoying-ness" of a sentence without reference to the history.

When BEAM-23 (cross-bundle mutable per-key-and-window state for ParDo) is implemented, you will be able to manually maintain the state and implement this sort of "only send output when the result changes" logic yourself.

However, I think this scenario likely deserves explicit consideration in the model. It blends the concepts embodied today by triggers and the accumulation mode.