Cloud Dataflow: reading entire text files rather t

2020-06-04 11:32发布

问题:

I'm looking for a way to read ENTIRE files so that every file will be read entirely to a single String. I want to pass a pattern of JSON text files on gs://my_bucket/*/*.json, have a ParDo then process each and every file entirely.

What's the best approach to it?

回答1:

I am going to give the most generally useful answer, even though there are special cases [1] where you might do something different.

I think what you want to do is to define a new subclass of FileBasedSource and use Read.from(<source>). Your source will also include a subclass of FileBasedReader; the source contains the configuration data and the reader actually does the reading.

I think a full description of the API is best left to the Javadoc, but I will highlight the key override points and how they relate to your needs:

  • FileBasedSource#isSplittable() you will want to override and return false. This will indicate that there is no intra-file splitting.
  • FileBasedSource#createForSubrangeOfFile(String, long, long) you will override to return a sub-source for just the file specified.
  • FileBasedSource#createSingleFileReader() you will override to produce a FileBasedReader for the current file (the method should assume it is already split to the level of a single file).

To implement the reader:

  • FileBasedReader#startReading(...) you will override to do nothing; the framework will already have opened the file for you, and it will close it.
  • FileBasedReader#readNextRecord() you will override to read the entire file as a single element.

[1] One example easy special case is when you actually have a small number of files, you can expand them prior to job submission, and they all take the same amount of time to process. Then you can just use Create.of(expand(<glob>)) followed by ParDo(<read a file>).



回答2:

Was looking for similar solution myself. Following Kenn's recommendations and few other references such as XMLSource.java, created the following custom source which seems to be working fine.

I am not a developer so if anyone has suggestions on how to improve it, please feel free to contribute.

public class FileIO {
// Match TextIO.
public static Read.Bounded<KV<String,String>> readFilepattern(String filepattern) {
    return Read.from(new FileSource(filepattern, 1));
}

public static class FileSource extends FileBasedSource<KV<String,String>> {
    private String filename = null;

    public FileSource(String fileOrPattern, long minBundleSize) {
        super(fileOrPattern, minBundleSize);
    }

    public FileSource(String filename, long minBundleSize, long startOffset, long endOffset) {
        super(filename, minBundleSize, startOffset, endOffset);
        this.filename = filename;
    }

    // This will indicate that there is no intra-file splitting.
    @Override
    public boolean isSplittable(){
        return false;
    }

    @Override
    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
        return false;
    }

    @Override
    public void validate() {}

    @Override
    public Coder<KV<String,String>> getDefaultOutputCoder() {
        return KvCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of());
    }

    @Override
    public FileBasedSource<KV<String,String>> createForSubrangeOfFile(String fileName, long start, long end) {
        return new FileSource(fileName, getMinBundleSize(), start, end);
    }

    @Override
    public FileBasedReader<KV<String,String>> createSingleFileReader(PipelineOptions options) {
        return new FileReader(this);
    }
}

/**
 * A reader that should read entire file of text from a {@link FileSource}.
 */
private static class FileReader extends FileBasedSource.FileBasedReader<KV<String,String>> {
    private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);
    private ReadableByteChannel channel = null;
    private long nextOffset = 0;
    private long currentOffset = 0;
    private boolean isAtSplitPoint = false;
    private final ByteBuffer buf;
    private static final int BUF_SIZE = 1024;
    private KV<String,String> currentValue = null;
    private String filename;

    public FileReader(FileSource source) {
        super(source);
        buf = ByteBuffer.allocate(BUF_SIZE);
        buf.flip();
        this.filename = source.filename;
    }

    private int readFile(ByteArrayOutputStream out) throws IOException {
        int byteCount = 0;
        while (true) {
            if (!buf.hasRemaining()) {
                buf.clear();
                int read = channel.read(buf);
                if (read < 0) {
                    break;
                }
                buf.flip();
            }
            byte b = buf.get();
            byteCount++;

            out.write(b);
        }
        return byteCount;
    }

    @Override
    protected void startReading(ReadableByteChannel channel) throws IOException {
        this.channel = channel;
    }

    @Override
    protected boolean readNextRecord() throws IOException {
        currentOffset = nextOffset;

        ByteArrayOutputStream buf = new ByteArrayOutputStream();
        int offsetAdjustment = readFile(buf);
        if (offsetAdjustment == 0) {
            // EOF
            return false;
        }
        nextOffset += offsetAdjustment;
        isAtSplitPoint = true;
        currentValue = KV.of(this.filename,CoderUtils.decodeFromByteArray(StringUtf8Coder.of(), buf.toByteArray()));
        return true;
    }

    @Override
    protected boolean isAtSplitPoint() {
        return isAtSplitPoint;
    }

    @Override
    protected long getCurrentOffset() {
        return currentOffset;
    }

    @Override
    public KV<String,String> getCurrent() throws NoSuchElementException {
        return currentValue;
    }
}

}


回答3:

A much simpler method is to generate the list of filenames and write a function to process each file individually. I'm showing Python, but Java is similar:

def generate_filenames():
  for shard in xrange(0, 300):
    yield 'gs://bucket/some/dir/myfilname-%05d-of-00300' % shard

with beam.Pipeline(...) as p:
  (p | generate_filenames()
     | beam.FlatMap(lambda filename: readfile(filename))
     | ...)