I'm trying to build Google DataFlow pipeline, which has these steps:
- Read from pub/sub topic a message which contains filename.
- Find in the google bucket file from filename
- read each line from the file
- send each line with filename as a single message to another topic
My problem is that I can't add filename to the final output message. Current implementation:
ConnectorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ConnectorOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadFromTopic", PubsubIO.readMessages().fromTopic(options.getInputTopic()))
.apply("CollectFiles", ParDo.of(new DoFn<PubsubMessage, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String fileName = new String(c.element().getPayload());
c.output("gs://bucket-name/" + fileName);
}
}))
.apply("ReadLines", TextIO.readAll())
.apply("WriteItemsToTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
p.run().waitUntilFinish();
I saw similar question asked before here but it's not really a working solution for me because I have to attach filename to each output message not just parse per each line. Could anyone please let me know about possible solutions?
update
Thanks @jkff, I followed your advice and my current solution code:
ConnectorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ConnectorOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadFromTopic", PubsubIO.readMessages().fromSubscription(options.getInputSubscription()))
.apply("PrintMessages", ParDo.of(new DoFn<PubsubMessage, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String message = new String(c.element().getPayload());
c.output("gs://bucket/" + message);
}
}))
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply("ReadFile", ParDo.of(new DoFn<FileIO.ReadableFile, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws IOException {
FileIO.ReadableFile f = c.element();
String filePath = f.getMetadata().resourceId().toString();
String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
ReadableByteChannel inChannel = f.open();
ByteBuffer buffer = ByteBuffer.allocate(1);
StringBuffer line = new StringBuffer();
while (inChannel.read(buffer) > 0) {
buffer.flip();
for (int i = 0; i < buffer.limit(); i++) {
char ch = ((char) buffer.get());
if (ch == '\r') {
c.output(line.toString() + " " + fileName);
line = new StringBuffer();
} else {
line.append(ch);
}
}
buffer.clear();
}
inChannel.close();
}
}))
.apply("WriteItemsToTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
p.run().waitUntilFinish();