I have a Beam pipeline that starts off with reading multiple text files where each line in a file represents a row that gets inserted into Bigtable later in the pipeline. The scenario requires confirming that the count of rows extracted from each file & count of rows later inserted into Bigtable match. For this I am planning to develop a custom Windowing strategy so that lines from a single file get assigned to a single window based on the file name as the key that will be passed to the Windowing function.
Is there any code sample for creating custom Windowing functions?
Although I changed my strategy for confirming the inserted number of rows, for anyone who is interested in windowing elements read from a batch source e.g. FileIO
in a batch job, here's the code for creating a custom windowing strategy:
public class FileWindows extends PartitioningWindowFn<Object, IntervalWindow>{
private static final long serialVersionUID = -476922142925927415L;
private static final Logger LOG = LoggerFactory.getLogger(FileWindows.class);
@Override
public IntervalWindow assignWindow(Instant timestamp) {
Instant end = new Instant(timestamp.getMillis() + 1);
IntervalWindow interval = new IntervalWindow(timestamp, end);
LOG.info("FileWindows >> assignWindow(): Window assigned with Start: {}, End: {}", timestamp, end);
return interval;
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return this.equals(other);
}
@Override
public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
if (!this.isCompatible(other)) {
throw new IncompatibleWindowException(other, String.format("Only %s objects are compatible.", FileWindows.class.getSimpleName()));
}
}
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
}
and then it can be used in the pipeline as below:
p
.apply("Assign_Timestamp_to_Each_Message", ParDo.of(new AssignTimestampFn()))
.apply("Assign_Window_to_Each_Message", Window.<KV<String,String>>into(new FileWindows())
.withAllowedLateness(Duration.standardMinutes(1))
.discardingFiredPanes());
Please keep in mind that you will need to write the AssignTimestampFn()
so that each message carries a timestamp.