Hadoop HDFS: Read sequence files that are being wr

2019-02-28 08:06发布

问题:

I am using Hadoop 1.0.3.

I write logs to an Hadoop sequence file into HDFS, I call syncFS() after each bunch of logs but I never close the file (except when I am performing daily rolling).

What I want to guarantee is that the file is available to readers while the file is still being written.

I can read the bytes of the sequence file via FSDataInputStream, but if I try to use SequenceFile.Reader.next(key,val), it returns false at the first call.

I know the data is in the file since I can read it with FSDataInputStream or with the cat command and I am 100% sure that syncFS() is called.

I checked the namenode and datanode logs, no error or warning.

Why SequenceFile.Reader is unable to read my currently being written file ?

回答1:

You can't ensure that a read is completely written to disk on the datanode side. You can see this in the documentation of DFSClient#DFSOutputStream.sync() which states:

  All data is written out to datanodes. It is not guaranteed that data has
  been flushed to persistent store on the datanode. Block allocations are
  persisted on namenode.

So it basically updates the the namenode's block map with the current information and sends the data to the datanode. Since you can't flush the data to disk on the datanode, but you directly read from the datanode you hit a timeframe where the data is somewhere buffered and not accessible. Thus your sequencefile reader will think that the datastream is finished (or empty) and can't read additional bytes returning false to the deserialization process.

A datanode writes the data to disk (it is written beforehand, but not readable from outside) if the block is fully received. So you are able to read from the file once your blocksize has been reached or your file has been closed beforehand and thus finalized a block. Which totally makes sense in a distributed environment, because your writer can die and not finish a block properly- this is a matter of consistency.

So the fix would be to make the blocksize very small so the block is finished more often. But that is not so efficient and I hope it should be clear that your requirement is not suited for HDFS.



回答2:

The reason the SequenceFile.Reader fails to read a file being written is that it uses the file length to perform its magic.

The file length stays at 0 while the first block is being written, and is updated only when the block is full (by default 64MB). Then the file size is stuck at 64MB until the second block is fully written and so on...

That means you can't read the last incomplete block in a sequence file using SequenceFile.Reader, even if the raw data is readable using directly FSInputStream.

Closing the file also fixes the file length, but in my case I need to read files before they are closed.



回答3:

So I hit the same issue and after some investigation and time I figured the following workaround that works.

So the problem is due to internal implementation of sequence file creation and the fact that it is using the file length which is updated per block of 64 MBs.

So I created the following class to create the reader and I wrapped the hadoop FS with my own while I overriding the get length method to return the file length instead:

public class SequenceFileUtil {

    public SequenceFile.Reader createReader(Configuration conf, Path path) throws IOException {

        WrappedFileSystem fileSystem = new WrappedFileSystem(FileSystem.get(conf));

        return new SequenceFile.Reader(fileSystem, path, conf);
    }

    private class WrappedFileSystem extends FileSystem
    {
        private final FileSystem nestedFs;

        public WrappedFileSystem(FileSystem fs){
            this.nestedFs = fs;
        }

        @Override
        public URI getUri() {
            return nestedFs.getUri();
        }

        @Override
        public FSDataInputStream open(Path f, int bufferSize) throws IOException {
            return nestedFs.open(f,bufferSize);
        }

        @Override
        public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
            return nestedFs.create(f, permission,overwrite,bufferSize, replication, blockSize, progress);
        }

        @Override
        public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
            return nestedFs.append(f, bufferSize, progress);
        }

        @Override
        public boolean rename(Path src, Path dst) throws IOException {
            return nestedFs.rename(src, dst);
        }

        @Override
        public boolean delete(Path path) throws IOException {
            return nestedFs.delete(path);
        }

        @Override
        public boolean delete(Path f, boolean recursive) throws IOException {
            return nestedFs.delete(f, recursive);
        }

        @Override
        public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
            return nestedFs.listStatus(f);
        }

        @Override
        public void setWorkingDirectory(Path new_dir) {
            nestedFs.setWorkingDirectory(new_dir);
        }

        @Override
        public Path getWorkingDirectory() {
            return nestedFs.getWorkingDirectory();
        }

        @Override
        public boolean mkdirs(Path f, FsPermission permission) throws IOException {
            return nestedFs.mkdirs(f, permission);
        }

        @Override
        public FileStatus getFileStatus(Path f) throws IOException {
            return nestedFs.getFileStatus(f);
        }


        @Override
        public long getLength(Path f) throws IOException {

            DFSClient.DFSInputStream open =  new DFSClient(nestedFs.getConf()).open(f.toUri().getPath());
            long fileLength = open.getFileLength();
            long length = nestedFs.getLength(f);

            if (length < fileLength){
                //We might have uncompleted blocks
                return fileLength;
            }

            return length;
        }


    }
}


回答4:

I faced a similar problem, here is how I fixed it: http://mail-archives.apache.org/mod_mbox/hadoop-common-user/201303.mbox/%3CCALtSBbY+LX6fiKutGsybS5oLXxZbVuN0WvW_a5JbExY98hJfig@mail.gmail.com%3E