我有一个梁管道与读取多个文本文件,其中一个文件中的每一行代表获取管道后插入Bigtable的一行开始了。 方案要求在确认从行中的每个文件和计数提取行的计数后插入Bigtable的匹配。 对于这个我打算开发一个定制的窗口化策略,这样从一个单一的文件系根据文件名称将被传递到窗口函数的键会被分配给一个窗口。
是否有用于创建自定义窗口函数的任何代码样本?
我有一个梁管道与读取多个文本文件,其中一个文件中的每一行代表获取管道后插入Bigtable的一行开始了。 方案要求在确认从行中的每个文件和计数提取行的计数后插入Bigtable的匹配。 对于这个我打算开发一个定制的窗口化策略,这样从一个单一的文件系根据文件名称将被传递到窗口函数的键会被分配给一个窗口。
是否有用于创建自定义窗口函数的任何代码样本?
虽然我改变了我的策略,以确认所插入的行数,对于任何人谁是有意从一个批次源如读窗口元素FileIO
于批处理作业,这里是创建一个自定义窗口战略的代码:
public class FileWindows extends PartitioningWindowFn<Object, IntervalWindow>{
private static final long serialVersionUID = -476922142925927415L;
private static final Logger LOG = LoggerFactory.getLogger(FileWindows.class);
@Override
public IntervalWindow assignWindow(Instant timestamp) {
Instant end = new Instant(timestamp.getMillis() + 1);
IntervalWindow interval = new IntervalWindow(timestamp, end);
LOG.info("FileWindows >> assignWindow(): Window assigned with Start: {}, End: {}", timestamp, end);
return interval;
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return this.equals(other);
}
@Override
public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
if (!this.isCompatible(other)) {
throw new IncompatibleWindowException(other, String.format("Only %s objects are compatible.", FileWindows.class.getSimpleName()));
}
}
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
}
然后它可以在管道下面可以使用:
p
.apply("Assign_Timestamp_to_Each_Message", ParDo.of(new AssignTimestampFn()))
.apply("Assign_Window_to_Each_Message", Window.<KV<String,String>>into(new FileWindows())
.withAllowedLateness(Duration.standardMinutes(1))
.discardingFiredPanes());
请记住,你需要写AssignTimestampFn()
使每个消息中携带的时间戳。