How to use dataflow text io dynamic destinations i

2019-02-25 14:53发布

问题:

Hello i am very confused By the dynamic file destinations api and there is no docs so here i am.

The situation is i have a PCollection and it contains events belonging to different partitions. I want to split them up and write them to different folders in gcs.

Here is what i have.

Dynamic destination object:

  class GCSDestinationString(prefix: String) extends DynamicDestinations[Event, String, String] {

    override def getDestination(element: Event): String = {
      element.partition //this returns a string which is a gcs folder path
    }

    override def getFilenamePolicy(destination: String): FileBasedSink.FilenamePolicy = {
      println(destination)
      val overallPrefix = s"$prefix/$destination/part-"
      DefaultFilenamePolicy.fromStandardParameters(
        ValueProvider.StaticValueProvider.of(
          FileSystems.matchNewResource(overallPrefix, false)),
        null, ".jsonl", true)
    }

    override def formatRecord(record: Event): String = {
      implicit val f = DefaultFormats
      write(record.toDataLakeFormat())
    }

    override def getDefaultDestination: String = "default"
  }

I believe this is the correct logic, i ask each element what its destination partition is and then that get passed into the getFileNamePolicy and from there a file name is built. To format the record i just convert it to json.

The issue is integrating this with TextIO, i tried this

TextIO.
  write()
  .withWindowedWrites()
  .withTempDirectory(tempDir)
  .to(new GCSDestinationString("gcs://bucket"))

but it requires that the source type be string, technically this could work but i would have to deserialise multiple times. I found in the docs for text io dynamic destinations

Often this is used in conjunction with {@link TextIO#writeCustomType}, which allows your {@link DynamicDestinations} object to examine the input type and takes a format function to convert that type to a string for writing.

So lets try that

  TextIO
    .writeCustomType[Event]()
    .withWindowedWrites()
    .withTempDirectory(tempDir)
    .to(new GCSDestinationString("gcs://bucket"))

This still doesn't compile as writeCustomType internally returns TypedWrite<UserT, Void> and that has the knock on affect of requiring the 2nd type parameter of my dynamic destination object to be Void. Clearly i require it to be a string or at least something other than Void

Im clearly missing something

回答1:

Oh man, this is embarrassing. Turns out writeCustomType().to(DynamicDestinations) was not tested and we didn't notice it, but it had a typo in the type signature. PR https://github.com/apache/beam/pull/4319 is in review. You'll still need 2.3.0-SNAPSHOT to pick it up though, in which case I would still recommend to just use FileIO.write().



回答2:

It doesn't seem to compile in scala but i was able to get the behaviour i wanted with a similar api after digging around

 var outputTransform =
        TextIO.
          writeCustomType[T]()
          .withFormatFunction(outputFormatter)
          .withNumShards(shards)
          .withTempDirectory(tempDir)
          .withCompression(compression)

      if (windowedWrites) {
        outputTransform = outputTransform.withWindowedWrites()
      }

      outputTransform.to(outputFileNamePolicyMapping, emptryDestination)

where output formatter is from T to string and outputFileNamePolicyMapping is from T to DefaultFilenamePolicy.Params



回答3:

Following @jkff advice on the Apache Beam Mailing List I managed to get it compiling and working this way:

val write = TextIO.writeCustomType[Event].asInstanceOf[TextIO.TypedWrite[Event, String]]
    .to(new MyDynamicDestinations(baseDir))

Although after doing it that way I realised it is more convenient to use a DefaultFilenamePolicy.Params instead of a String as the destination output. Let me know if you want more information about that bit.