Google DataFlow: attaching filename to the message

2019-07-21 19:14发布

问题:

I'm trying to build Google DataFlow pipeline, which has these steps:

  • Read from pub/sub topic a message which contains filename.
  • Find in the google bucket file from filename
  • read each line from the file
  • send each line with filename as a single message to another topic

My problem is that I can't add filename to the final output message. Current implementation:

ConnectorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ConnectorOptions.class);

Pipeline p = Pipeline.create(options);

p.apply("ReadFromTopic", PubsubIO.readMessages().fromTopic(options.getInputTopic()))
    .apply("CollectFiles", ParDo.of(new DoFn<PubsubMessage, String>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String fileName = new String(c.element().getPayload());
            c.output("gs://bucket-name/" + fileName);
        }
    }))
    .apply("ReadLines", TextIO.readAll())
    .apply("WriteItemsToTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));

p.run().waitUntilFinish();

I saw similar question asked before here but it's not really a working solution for me because I have to attach filename to each output message not just parse per each line. Could anyone please let me know about possible solutions?

update

Thanks @jkff, I followed your advice and my current solution code:

ConnectorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ConnectorOptions.class);
    Pipeline p = Pipeline.create(options);
    p.apply("ReadFromTopic", PubsubIO.readMessages().fromSubscription(options.getInputSubscription()))
            .apply("PrintMessages", ParDo.of(new DoFn<PubsubMessage, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    String message = new String(c.element().getPayload());
                    c.output("gs://bucket/" + message);
                }
            }))
            .apply(FileIO.matchAll())
            .apply(FileIO.readMatches())
            .apply("ReadFile", ParDo.of(new DoFn<FileIO.ReadableFile, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws IOException {
                    FileIO.ReadableFile f = c.element();

                    String filePath = f.getMetadata().resourceId().toString();
                    String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);

                    ReadableByteChannel inChannel = f.open();
                    ByteBuffer buffer = ByteBuffer.allocate(1);
                    StringBuffer line = new StringBuffer();
                    while (inChannel.read(buffer) > 0) {
                        buffer.flip();
                        for (int i = 0; i < buffer.limit(); i++) {
                            char ch = ((char) buffer.get());
                            if (ch == '\r') {
                                c.output(line.toString() + " " + fileName);
                                line = new StringBuffer();
                            } else {
                                line.append(ch);
                            }
                        }
                        buffer.clear();
                    }
                    inChannel.close();
                }
            }))
            .apply("WriteItemsToTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
    p.run().waitUntilFinish();

回答1:

You can use FileIO - use FileIO.matchAll() followed by FileIO.readMatches() to get a PCollection<ReadableFile>, where each ReadableFile can be used to get the filename and to read the file. Follow it by a DoFn that does what you want. To read the file, use standard Java library facilities on the ReadableFile's .open().