How to use FileIO.writeDynamic() in Apache Beam 2.

2019-08-24 07:28发布

问题:

I am using Apache Beam 2.6 to read from a single Kafka topic and write the output to Google Cloud Storage (GCS). Now I want to alter the pipeline so that it is reading multiple topics and writing them out as gs://bucket/topic/...

When reading only a single topic I used TextIO in the last step of my pipeline:

TextIO.write()
    .to(
        new DateNamedFiles(
            String.format("gs://bucket/data%s/", suffix), currentMillisString))
    .withWindowedWrites()
    .withTempDirectory(
        FileBasedSink.convertToFileResourceIfPossible(
            String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
    .withNumShards(1));

This is a similar question, which code I tried to adapt.

FileIO.<EventType, Event>writeDynamic()
    .by(
        new SerializableFunction<Event, EventType>() {
          @Override
          public EventType apply(Event input) {
            return EventType.TRANSFER; // should return real type here, just a dummy
          }
        })
    .via(
        Contextful.fn(
            new SerializableFunction<Event, String>() {
              @Override
              public String apply(Event input) {
                return "Dummy"; // should return the Event converted to a String
              }
            }),
        TextIO.sink())
    .to(DynamicFileDestinations.constant(new DateNamedFiles("gs://bucket/tmp%s/%s/",
                                                            currentMillisString),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String input) {
            return null; // Not sure what this should exactly, but it needs to 
                         // include the EventType into the path
          }
        }))
    .withTempDirectory(
        FileBasedSink.convertToFileResourceIfPossible(
            String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
    .withNumShards(1))

The official JavaDoc contains example code which seem to have outdated method signatures. (The .via method seems to have switched the order of the arguments). I' furthermore stumbled across the example in FileIO which confused me - shouldn't TransactionType and Transaction in this line change places?

回答1:

After a night of sleep and a fresh start I figured out the solution, I used the functional Java 8 style as it makes the code shorter (and more readable):

  .apply(
    FileIO.<String, Event>writeDynamic()
        .by((SerializableFunction<Event, String>) input -> input.getTopic())
        .via(
            Contextful.fn(
                (SerializableFunction<Event, String>) input -> input.getPayload()),
            TextIO.sink())
        .to(String.format("gs://bucket/data%s/", suffix)
        .withNaming(type -> FileNaming.getNaming(type, "", currentMillisString))
        .withDestinationCoder(StringUtf8Coder.of())
        .withTempDirectory(
            String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString))
        .withNumShards(1));

Explanation:

  • Event is a Java POJO containing the payload of the Kafka message and the topic it belongs to, it is parsed in a ParDo after the KafkaIO step
  • suffix is a either dev or empty and set by environment variables
  • currentMillisStringcontains the timestamp when the whole pipeline was launched so that new files don't overwrite old files on GCS when a pipeline gets restarted
  • FileNaming implements a custom naming and receives the type of the event (the topic) in it's constructor, it uses a custom formatter to write to daily partitioned "sub-folders" on GCS:

    class FileNaming implements FileIO.Write.FileNaming {
      static FileNaming getNaming(String topic, String suffix, String currentMillisString) {
        return new FileNaming(topic, suffix, currentMillisString);
      }
    
      private static final DateTimeFormatter FORMATTER = DateTimeFormat
          .forPattern("yyyy-MM-dd").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("Europe/Zurich")));
    
      private final String topic;
      private final String suffix;
      private final String currentMillisString;
    
      private String filenamePrefixForWindow(IntervalWindow window) {
        return String.format(
            "%s/%s/%s_", topic, FORMATTER.print(window.start()), currentMillisString);
      }
    
      private FileNaming(String topic, String suffix, String currentMillisString) {
        this.topic = topic;
        this.suffix = suffix;
        this.currentMillisString = currentMillisString;
      }
    
      @Override
      public String getFilename(
          BoundedWindow window,
          PaneInfo pane,
          int numShards,
          int shardIndex,
          Compression compression) {
    
        IntervalWindow intervalWindow = (IntervalWindow) window;
        String filenamePrefix = filenamePrefixForWindow(intervalWindow);
        String filename =
            String.format(
                "pane-%d-%s-%05d-of-%05d%s",
                pane.getIndex(),
                pane.getTiming().toString().toLowerCase(),
                shardIndex,
                numShards,
                suffix);
        String fullName = filenamePrefix + filename;
        return fullName;
      }
    }