I am using Apache Beam 2.6 to read from a single Kafka topic and write the output to Google Cloud Storage (GCS). Now I want to alter the pipeline so that it is reading multiple topics and writing them out as gs://bucket/topic/...
When reading only a single topic I used TextIO
in the last step of my pipeline:
TextIO.write()
.to(
new DateNamedFiles(
String.format("gs://bucket/data%s/", suffix), currentMillisString))
.withWindowedWrites()
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
.withNumShards(1));
This is a similar question, which code I tried to adapt.
FileIO.<EventType, Event>writeDynamic()
.by(
new SerializableFunction<Event, EventType>() {
@Override
public EventType apply(Event input) {
return EventType.TRANSFER; // should return real type here, just a dummy
}
})
.via(
Contextful.fn(
new SerializableFunction<Event, String>() {
@Override
public String apply(Event input) {
return "Dummy"; // should return the Event converted to a String
}
}),
TextIO.sink())
.to(DynamicFileDestinations.constant(new DateNamedFiles("gs://bucket/tmp%s/%s/",
currentMillisString),
new SerializableFunction<String, String>() {
@Override
public String apply(String input) {
return null; // Not sure what this should exactly, but it needs to
// include the EventType into the path
}
}))
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
.withNumShards(1))
The official JavaDoc contains example code which seem to have outdated method signatures. (The .via
method seems to have switched the order of the arguments). I' furthermore stumbled across the example in FileIO
which confused me - shouldn't TransactionType
and Transaction
in this line change places?
After a night of sleep and a fresh start I figured out the solution, I used the functional Java 8 style as it makes the code shorter (and more readable):
Explanation:
Event
is a Java POJO containing the payload of the Kafka message and the topic it belongs to, it is parsed in aParDo
after theKafkaIO
stepsuffix
is a eitherdev
or empty and set by environment variablescurrentMillisString
contains the timestamp when the whole pipeline was launched so that new files don't overwrite old files on GCS when a pipeline gets restartedFileNaming
implements a custom naming and receives the type of the event (the topic) in it's constructor, it uses a custom formatter to write to daily partitioned "sub-folders" on GCS: