如何使用FileIO.writeDynamic()在Apache中梁2.6写入多个输出路径?(How

2019-10-29 22:48发布

我使用Apache 2.6光束从单一的卡夫卡话题读写输出到谷歌云存储(GCS)。 现在我想,使其读取多个主题,写出来以改变管道gs://bucket/topic/...

当读取只有一个话题我用TextIO在我的管道的最后一步:

TextIO.write()
    .to(
        new DateNamedFiles(
            String.format("gs://bucket/data%s/", suffix), currentMillisString))
    .withWindowedWrites()
    .withTempDirectory(
        FileBasedSink.convertToFileResourceIfPossible(
            String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
    .withNumShards(1));

这是一个类似的问题,我试着去适应它的代码。

FileIO.<EventType, Event>writeDynamic()
    .by(
        new SerializableFunction<Event, EventType>() {
          @Override
          public EventType apply(Event input) {
            return EventType.TRANSFER; // should return real type here, just a dummy
          }
        })
    .via(
        Contextful.fn(
            new SerializableFunction<Event, String>() {
              @Override
              public String apply(Event input) {
                return "Dummy"; // should return the Event converted to a String
              }
            }),
        TextIO.sink())
    .to(DynamicFileDestinations.constant(new DateNamedFiles("gs://bucket/tmp%s/%s/",
                                                            currentMillisString),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String input) {
            return null; // Not sure what this should exactly, but it needs to 
                         // include the EventType into the path
          }
        }))
    .withTempDirectory(
        FileBasedSink.convertToFileResourceIfPossible(
            String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
    .withNumShards(1))

在官方的JavaDoc包含示例代码,这似乎有过时的方法签名。 (该.via方法似乎已切换的参数的顺序)。 我还跨在例如跌跌撞撞FileIO不应该-这让我感到困惑TransactionTypeTransaction 这一行改变的地方?

Answer 1:

睡眠和一个新的开始我想出溶液经过一夜,我所用的功能的Java 8风格,因为它使代码更短(更可读的):

  .apply(
    FileIO.<String, Event>writeDynamic()
        .by((SerializableFunction<Event, String>) input -> input.getTopic())
        .via(
            Contextful.fn(
                (SerializableFunction<Event, String>) input -> input.getPayload()),
            TextIO.sink())
        .to(String.format("gs://bucket/data%s/", suffix)
        .withNaming(type -> FileNaming.getNaming(type, "", currentMillisString))
        .withDestinationCoder(StringUtf8Coder.of())
        .withTempDirectory(
            String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString))
        .withNumShards(1));

说明:

  • Event是包含卡夫卡消息的有效载荷和它属于,它是在一个解析的主题一个Java POJO ParDoKafkaIO步骤
  • suffix是一个无论是dev还是空白,并通过环境变量设置
  • currentMillisString包含在整个管道推出,使新的文件不会覆盖上GCS旧文件的时间戳当管道被重新启动
  • FileNaming实现自定义命名,并接收它的构造函数的事件(话题)的类型,它使用自定义格式写上每天GCS分区“子文件夹”:

     class FileNaming implements FileIO.Write.FileNaming { static FileNaming getNaming(String topic, String suffix, String currentMillisString) { return new FileNaming(topic, suffix, currentMillisString); } private static final DateTimeFormatter FORMATTER = DateTimeFormat .forPattern("yyyy-MM-dd").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("Europe/Zurich"))); private final String topic; private final String suffix; private final String currentMillisString; private String filenamePrefixForWindow(IntervalWindow window) { return String.format( "%s/%s/%s_", topic, FORMATTER.print(window.start()), currentMillisString); } private FileNaming(String topic, String suffix, String currentMillisString) { this.topic = topic; this.suffix = suffix; this.currentMillisString = currentMillisString; } @Override public String getFilename( BoundedWindow window, PaneInfo pane, int numShards, int shardIndex, Compression compression) { IntervalWindow intervalWindow = (IntervalWindow) window; String filenamePrefix = filenamePrefixForWindow(intervalWindow); String filename = String.format( "pane-%d-%s-%05d-of-%05d%s", pane.getIndex(), pane.getTiming().toString().toLowerCase(), shardIndex, numShards, suffix); String fullName = filenamePrefix + filename; return fullName; } } 


文章来源: How to use FileIO.writeDynamic() in Apache Beam 2.6 to write to multiple output paths?