Hadoop的HDFS:阅读正在写入序列文件(Hadoop HDFS: Read sequence

2019-08-07 04:08发布

我使用Hadoop 1.0.3。

我写日志到一个Hadoop的序列文件到HDFS,我叫syncFS()日志的每串之后,但我从来没有关闭该文件(当我每天进行滚动除外)。

我想保证的是,该文件是提供给读者,而该文件仍然被写入。

我可以读通过FSDataInputStream序列文件的字节数,但如果我尝试使用SequenceFile.Reader.next(键,VAL),它在第一次调用返回false。

我知道这个数据是在该文件中,因为我可以FSDataInputStream或cat命令读取它,我100%肯定是syncFS()被调用。

我检查了NameNode和DataNode会记录,没有错误或警告。

为什么SequenceFile.Reader无法读取我目前正在写的文件?

Answer 1:

你不能保证读完全写入磁盘上的数据节点的一面。 你可以的文档中看到这个DFSClient#DFSOutputStream.sync()的规定:

  All data is written out to datanodes. It is not guaranteed that data has
  been flushed to persistent store on the datanode. Block allocations are
  persisted on namenode.

因此,它基本上更新与当前信息的名称节点的块映射,并将数据发送到数据管理部。 既然你无法将数据刷新到磁盘上的数据节点,但你从数据管理部直接读取你打的时间内对数据进行缓冲的地方,而不是访问。 因此,你的sequencefile读者会认为,数据流结束(或空),并且不能读取返回false的反序列化过程的附加字节。

甲数据节点将数据写入到磁盘(它是事先写入的,但是从外部无法读取)如果块被完全接收。 所以,你可以从文件中读取一旦你的块大小已达到或您的文件已预先关闭,从而完成一个块。 这使得完全分布式环境中的意义,因为你的作家可以死,而不是完成一个块properly-这是一致性的问题。

因此,解决将是使块大小非常小,因此块完成更加频繁。 但事实并非如此有效,我希望它应该清楚,你的要求是不适合HDFS。



Answer 2:

在SequenceFile.Reader无法读取写入文件的原因是,它使用的文件长度来执行它的魔力。

文件长度保持为0,而第一块被写入,并且仅当该块是满被更新(通过默认64MB)。 然后将文件的大小是停留在64MB直到第二块写满等等...

这意味着使用SequenceFile.Reader您无法读取序列文件的最后一个不完整的块,即使直接使用FSInputStream的原始数据是可读的。

关闭文件还修复了文件长度,但对我来说,我需要他们关闭之前读取文件。



Answer 3:

于是我打了同样的问题,经过一番考察和时间我想通了以下解决方法的作品。

所以,问题是由于内部实现序列文件创建的,并且它使用被每64 MB的块更新文件长度的事实。

所以,我创建了下面的类来创建读者和我包裹着我自己的Hadoop的FS,而我重写获取长度方法返回文件长度,而不是:

public class SequenceFileUtil {

    public SequenceFile.Reader createReader(Configuration conf, Path path) throws IOException {

        WrappedFileSystem fileSystem = new WrappedFileSystem(FileSystem.get(conf));

        return new SequenceFile.Reader(fileSystem, path, conf);
    }

    private class WrappedFileSystem extends FileSystem
    {
        private final FileSystem nestedFs;

        public WrappedFileSystem(FileSystem fs){
            this.nestedFs = fs;
        }

        @Override
        public URI getUri() {
            return nestedFs.getUri();
        }

        @Override
        public FSDataInputStream open(Path f, int bufferSize) throws IOException {
            return nestedFs.open(f,bufferSize);
        }

        @Override
        public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
            return nestedFs.create(f, permission,overwrite,bufferSize, replication, blockSize, progress);
        }

        @Override
        public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
            return nestedFs.append(f, bufferSize, progress);
        }

        @Override
        public boolean rename(Path src, Path dst) throws IOException {
            return nestedFs.rename(src, dst);
        }

        @Override
        public boolean delete(Path path) throws IOException {
            return nestedFs.delete(path);
        }

        @Override
        public boolean delete(Path f, boolean recursive) throws IOException {
            return nestedFs.delete(f, recursive);
        }

        @Override
        public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
            return nestedFs.listStatus(f);
        }

        @Override
        public void setWorkingDirectory(Path new_dir) {
            nestedFs.setWorkingDirectory(new_dir);
        }

        @Override
        public Path getWorkingDirectory() {
            return nestedFs.getWorkingDirectory();
        }

        @Override
        public boolean mkdirs(Path f, FsPermission permission) throws IOException {
            return nestedFs.mkdirs(f, permission);
        }

        @Override
        public FileStatus getFileStatus(Path f) throws IOException {
            return nestedFs.getFileStatus(f);
        }


        @Override
        public long getLength(Path f) throws IOException {

            DFSClient.DFSInputStream open =  new DFSClient(nestedFs.getConf()).open(f.toUri().getPath());
            long fileLength = open.getFileLength();
            long length = nestedFs.getLength(f);

            if (length < fileLength){
                //We might have uncompleted blocks
                return fileLength;
            }

            return length;
        }


    }
}


Answer 4:

我遇到了类似的问题,这是我如何固定它: http://mail-archives.apache.org/mod_mbox/hadoop-common-user/201303.mbox/%3CCALtSBbY+LX6fiKutGsybS5oLXxZbVuN0WvW_a5JbExY98hJfig@mail.gmail.com%3E



文章来源: Hadoop HDFS: Read sequence files that are being written