Windowing with Apache Beam - Fixed Windows Don'

2020-02-26 08:33发布

问题:

We are attempting to use fixed windows on an Apache Beam pipeline (using DirectRunner). Our flow is as follows:

  1. Pull data from pub/sub
  2. Deserialize JSON into Java object
  3. Window events w/ fixed windows of 5 seconds
  4. Using a custom CombineFn, combine each window of Events into a List<Event>
  5. For the sake of testing, simply output the resulting List<Event>

Pipeline code:

    pipeline
                // Read from pubsub topic to create unbounded PCollection
                .apply(PubsubIO
                    .<String>read()
                    .topic(options.getTopic())
                    .withCoder(StringUtf8Coder.of())
                )

                // Deserialize JSON into Event object
                .apply("ParseEvent", ParDo
                    .of(new ParseEventFn())
                )

                // Window events with a fixed window size of 5 seconds
                .apply("Window", Window
                    .<Event>into(FixedWindows
                        .of(Duration.standardSeconds(5))
                    )
                )

                // Group events by window
                .apply("CombineEvents", Combine
                    .globally(new CombineEventsFn())
                    .withoutDefaults()
                )

                // Log grouped events
                .apply("LogEvent", ParDo
                    .of(new LogEventFn())
                );

The result we are seeing is that the final step is never run, as we don't get any logging.

Also, we have added System.out.println("***") in each method of our custom CombineFn class, in order to track when these are run, and it seems they don't run either.

Is windowing set up incorrectly here? We followed an example found at https://beam.apache.org/documentation/programming-guide/#windowing and it seems fairly straightforward, but clearly there is something fundamental missing.

Any insight is appreciated - thanks in advance!

回答1:

Looks like the main issue was indeed a missing trigger - the window was opening and there was nothing telling it when to emit results. We wanted to simply window based on processing time (not event time) and so did the following:

.apply("Window", Window
    .<Event>into(new GlobalWindows())
    .triggering(Repeatedly
        .forever(AfterProcessingTime
            .pastFirstElementInPane()
            .plusDelayOf(Duration.standardSeconds(5))
        )
    )
    .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)

Essentially this creates a global window, which is triggered to emit events 5 seconds after the first element is processed. Every time the window is closed, another is opened once it receives an element. Beam complained when we didn't have the withAllowedLateness piece - as far as I know this just tells it to ignore any late data.

My understanding may be a bit off the mark here, but the above snippet has solved our problem!