I'm trying to build Google DataFlow pipeline, which has these steps:
- Read from pub/sub topic a message which contains filename.
- Find in the google bucket file from filename
- read each line from the file
- send each line with filename as a single message to another topic
My problem is that I can't add filename to the final output message. Current implementation:
ConnectorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ConnectorOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadFromTopic", PubsubIO.readMessages().fromTopic(options.getInputTopic()))
.apply("CollectFiles", ParDo.of(new DoFn<PubsubMessage, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String fileName = new String(c.element().getPayload());
c.output("gs://bucket-name/" + fileName);
}
}))
.apply("ReadLines", TextIO.readAll())
.apply("WriteItemsToTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
p.run().waitUntilFinish();
I saw similar question asked before here but it's not really a working solution for me because I have to attach filename to each output message not just parse per each line. Could anyone please let me know about possible solutions?
update
Thanks @jkff, I followed your advice and my current solution code:
ConnectorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ConnectorOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadFromTopic", PubsubIO.readMessages().fromSubscription(options.getInputSubscription()))
.apply("PrintMessages", ParDo.of(new DoFn<PubsubMessage, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String message = new String(c.element().getPayload());
c.output("gs://bucket/" + message);
}
}))
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply("ReadFile", ParDo.of(new DoFn<FileIO.ReadableFile, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws IOException {
FileIO.ReadableFile f = c.element();
String filePath = f.getMetadata().resourceId().toString();
String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
ReadableByteChannel inChannel = f.open();
ByteBuffer buffer = ByteBuffer.allocate(1);
StringBuffer line = new StringBuffer();
while (inChannel.read(buffer) > 0) {
buffer.flip();
for (int i = 0; i < buffer.limit(); i++) {
char ch = ((char) buffer.get());
if (ch == '\r') {
c.output(line.toString() + " " + fileName);
line = new StringBuffer();
} else {
line.append(ch);
}
}
buffer.clear();
}
inChannel.close();
}
}))
.apply("WriteItemsToTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
p.run().waitUntilFinish();
You can use
FileIO
- useFileIO.matchAll()
followed byFileIO.readMatches()
to get aPCollection<ReadableFile>
, where eachReadableFile
can be used to get the filename and to read the file. Follow it by aDoFn
that does what you want. To read the file, use standard Java library facilities on theReadableFile
's.open()
.