Why did #sideInput() method move from Context to P

2019-02-24 07:22发布

问题:

I wonder why has the #sideInput() method moved to ProcessContext class? Previously I could do some additional processing in the #startBundle() method and cache the result. Doing that in #processElement() sounds less efficient. Of course I could do the preprocessing before passing the data to the view, but there still is the overhead of calling #sideInput() for each element...

Thanks, G

回答1:

Great question. The reason is that we added support for windowed PCollections as side inputs. This enables additional scenarios, including using side inputs with unbounded PCollections in streaming mode.

Before the change, we only supported side inputs that were globally windowed, and then entire side input PCollection was available while processing every element of the main input PCollection. This works fine for bounded PCollections in traditional batch style processing, but didn't extend to windowed or unbounded PCollections.

After the change, the window of the current element you are processing in your ParDo controls what subset of the side input is visible. (And so you can't access side inputs in startBundle(), where there is no current element and hence no current window.)

For example, consider an example where you have a streaming pipeline processing your website logs and providing real time updates to a live usage dashboard. You've got two unbounded input PCollections: one contains new user signups and the other contains user clicks. You can identify which user clicks come from new users by windowing both PCollections by hour and doing a ParDo over the user clicks that takes new user signups as a side input. Now when you process a user click which is in a given hour, you automatically see just the subset of the new user sign ups from the same hour. You can do different variants on this by changing the windowing functions and moving element timestamps forward in time on the side input -- like continuing to window the user clicks per hour, but using the new signups from the last 24 hours.

I do agree this change makes it harder to cache any postprocessing on your side input. We added View.asMultimap to handle a common case where you turn the Iterable into a lookup table. If your post-processing is element-wise, you can do it with a ParDo before creating the PCollectionView. For anything else right now, I'd recommend doing it lazily from within processElement. I'd be interested in hearing about other patterns that occur, so we can work on ways to make them more efficient.