Hello i am very confused By the dynamic file destinations api and there is no docs so here i am.
The situation is i have a PCollection and it contains events belonging to different partitions. I want to split them up and write them to different folders in gcs.
Here is what i have.
Dynamic destination object:
class GCSDestinationString(prefix: String) extends DynamicDestinations[Event, String, String] {
override def getDestination(element: Event): String = {
element.partition //this returns a string which is a gcs folder path
}
override def getFilenamePolicy(destination: String): FileBasedSink.FilenamePolicy = {
println(destination)
val overallPrefix = s"$prefix/$destination/part-"
DefaultFilenamePolicy.fromStandardParameters(
ValueProvider.StaticValueProvider.of(
FileSystems.matchNewResource(overallPrefix, false)),
null, ".jsonl", true)
}
override def formatRecord(record: Event): String = {
implicit val f = DefaultFormats
write(record.toDataLakeFormat())
}
override def getDefaultDestination: String = "default"
}
I believe this is the correct logic, i ask each element what its destination partition is and then that get passed into the getFileNamePolicy and from there a file name is built. To format the record i just convert it to json.
The issue is integrating this with TextIO, i tried this
TextIO.
write()
.withWindowedWrites()
.withTempDirectory(tempDir)
.to(new GCSDestinationString("gcs://bucket"))
but it requires that the source type be string, technically this could work but i would have to deserialise multiple times. I found in the docs for text io dynamic destinations
Often this is used in conjunction with {@link TextIO#writeCustomType}, which allows your {@link DynamicDestinations} object to examine the input type and takes a format function to convert that type to a string for writing.
So lets try that
TextIO
.writeCustomType[Event]()
.withWindowedWrites()
.withTempDirectory(tempDir)
.to(new GCSDestinationString("gcs://bucket"))
This still doesn't compile as writeCustomType internally returns TypedWrite<UserT, Void>
and that has the knock on affect of requiring the 2nd type parameter of my dynamic destination object to be Void. Clearly i require it to be a string or at least something other than Void
Im clearly missing something