我使用Apache 2.6光束从单一的卡夫卡话题读写输出到谷歌云存储(GCS)。 现在我想,使其读取多个主题,写出来以改变管道gs://bucket/topic/...
当读取只有一个话题我用TextIO
在我的管道的最后一步:
TextIO.write()
.to(
new DateNamedFiles(
String.format("gs://bucket/data%s/", suffix), currentMillisString))
.withWindowedWrites()
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
.withNumShards(1));
这是一个类似的问题,我试着去适应它的代码。
FileIO.<EventType, Event>writeDynamic()
.by(
new SerializableFunction<Event, EventType>() {
@Override
public EventType apply(Event input) {
return EventType.TRANSFER; // should return real type here, just a dummy
}
})
.via(
Contextful.fn(
new SerializableFunction<Event, String>() {
@Override
public String apply(Event input) {
return "Dummy"; // should return the Event converted to a String
}
}),
TextIO.sink())
.to(DynamicFileDestinations.constant(new DateNamedFiles("gs://bucket/tmp%s/%s/",
currentMillisString),
new SerializableFunction<String, String>() {
@Override
public String apply(String input) {
return null; // Not sure what this should exactly, but it needs to
// include the EventType into the path
}
}))
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
.withNumShards(1))
在官方的JavaDoc包含示例代码,这似乎有过时的方法签名。 (该.via
方法似乎已切换的参数的顺序)。 我还跨在例如跌跌撞撞FileIO
不应该-这让我感到困惑TransactionType
和Transaction
这一行改变的地方?