Can Dataflow sideInput be updated per window by re

2020-06-28 08:57发布

问题:

I’m currently creating a PCollectionView by reading filtering information from a gcs bucket and passing it as side input to different stages of my pipeline in order to filter the output. If the file in the gcs bucket changes, I want the currently running pipeline to use this new filter info. Is there a way to update this PCollectionView on each new window of data if my filter changes? I thought I could do it in a startBundle but I can’t figure out how or if it’s possible. Could you give an example if it is possible.

PCollectionView<Map<String, TagObject>> 
    tagMapView =
        pipeline.apply(TextIO.Read.named("TagListTextRead")
                                  .from("gs://tag-list-bucket/tag-list.json"))
                .apply(ParDo.named("TagsToTagMap").of(new Tags.BuildTagListMapFn()))
                .apply("MakeTagMapView", View.asSingleton());
PCollection<String> 
    windowedData =
        pipeline.apply(PubsubIO.Read.topic("myTopic"))
                .apply(Window.<String>into(
                              SlidingWindows.of(Duration.standardMinutes(15))
                                            .every(Duration.standardSeconds(31))));
PCollection<MY_DATA> 
    lineData = windowedData
        .apply(ParDo.named("ExtractJsonObject")
            .withSideInputs(tagMapView)
            .of(new ExtractJsonObjectFn()));

回答1:

You probably want something like "use an at most a 1-minute-old version of the filter as a side input" (since in theory the file can change frequently, unpredictably, and independently from your pipeline - so there's no way really to completely synchronize changes of the file with the behavior of the pipeline).

Here's a (granted, rather clumsy) solution I was able to come up with. It relies on the fact that side inputs are implicitly also keyed by window. In this solution we're going to create a side input windowed into 1-minute fixed windows, where each window will contain a single value of the tag map, derived from the filter file as-of some moment inside that window.

PCollection<Long> ticks = p
  // Produce 1 "tick" per second
  .apply(CountingInput.unbounded().withRate(1, Duration.standardSeconds(1)))
  // Window the ticks into 1-minute windows
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  // Use an arbitrary per-window combiner to reduce to 1 element per window
  .apply(Count.globally());

// Produce a collection of tag maps, 1 per each 1-minute window
PCollectionView<TagMap> tagMapView = ticks
  .apply(MapElements.via((Long ignored) -> {
    ... manually read the json file as a TagMap ...
  }))
  .apply(View.asSingleton());

This pattern (joining against slowly changing external data as a side input) is coming up repeatedly, and the solution I'm proposing here is far from perfect, I wish we had better support for this in the programming model. I've filed a BEAM JIRA issue to track this.