I am using Apache Beam 2.6 to read from a single Kafka topic and write the output to Google Cloud Storage (GCS). Now I want to alter the pipeline so that it is reading multiple topics and writing them out as gs://bucket/topic/...
When reading only a single topic I used TextIO
in the last step of my pipeline:
TextIO.write()
.to(
new DateNamedFiles(
String.format("gs://bucket/data%s/", suffix), currentMillisString))
.withWindowedWrites()
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
.withNumShards(1));
This is a similar question, which code I tried to adapt.
FileIO.<EventType, Event>writeDynamic()
.by(
new SerializableFunction<Event, EventType>() {
@Override
public EventType apply(Event input) {
return EventType.TRANSFER; // should return real type here, just a dummy
}
})
.via(
Contextful.fn(
new SerializableFunction<Event, String>() {
@Override
public String apply(Event input) {
return "Dummy"; // should return the Event converted to a String
}
}),
TextIO.sink())
.to(DynamicFileDestinations.constant(new DateNamedFiles("gs://bucket/tmp%s/%s/",
currentMillisString),
new SerializableFunction<String, String>() {
@Override
public String apply(String input) {
return null; // Not sure what this should exactly, but it needs to
// include the EventType into the path
}
}))
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
.withNumShards(1))
The official JavaDoc contains example code which seem to have outdated method signatures. (The .via
method seems to have switched the order of the arguments). I' furthermore stumbled across the example in FileIO
which confused me - shouldn't TransactionType
and Transaction
in this line change places?