我使用Hadoop 1.0.3。
我写日志到一个Hadoop的序列文件到HDFS,我叫syncFS()日志的每串之后,但我从来没有关闭该文件(当我每天进行滚动除外)。
我想保证的是,该文件是提供给读者,而该文件仍然被写入。
我可以读通过FSDataInputStream序列文件的字节数,但如果我尝试使用SequenceFile.Reader.next(键,VAL),它在第一次调用返回false。
我知道这个数据是在该文件中,因为我可以FSDataInputStream或cat命令读取它,我100%肯定是syncFS()被调用。
我检查了NameNode和DataNode会记录,没有错误或警告。
为什么SequenceFile.Reader无法读取我目前正在写的文件?
你不能保证读完全写入磁盘上的数据节点的一面。 你可以的文档中看到这个DFSClient#DFSOutputStream.sync()
的规定:
All data is written out to datanodes. It is not guaranteed that data has
been flushed to persistent store on the datanode. Block allocations are
persisted on namenode.
因此,它基本上更新与当前信息的名称节点的块映射,并将数据发送到数据管理部。 既然你无法将数据刷新到磁盘上的数据节点,但你从数据管理部直接读取你打的时间内对数据进行缓冲的地方,而不是访问。 因此,你的sequencefile读者会认为,数据流结束(或空),并且不能读取返回false的反序列化过程的附加字节。
甲数据节点将数据写入到磁盘(它是事先写入的,但是从外部无法读取)如果块被完全接收。 所以,你可以从文件中读取一旦你的块大小已达到或您的文件已预先关闭,从而完成一个块。 这使得完全分布式环境中的意义,因为你的作家可以死,而不是完成一个块properly-这是一致性的问题。
因此,解决将是使块大小非常小,因此块完成更加频繁。 但事实并非如此有效,我希望它应该清楚,你的要求是不适合HDFS。
在SequenceFile.Reader无法读取写入文件的原因是,它使用的文件长度来执行它的魔力。
文件长度保持为0,而第一块被写入,并且仅当该块是满被更新(通过默认64MB)。 然后将文件的大小是停留在64MB直到第二块写满等等...
这意味着使用SequenceFile.Reader您无法读取序列文件的最后一个不完整的块,即使直接使用FSInputStream的原始数据是可读的。
关闭文件还修复了文件长度,但对我来说,我需要他们关闭之前读取文件。
于是我打了同样的问题,经过一番考察和时间我想通了以下解决方法的作品。
所以,问题是由于内部实现序列文件创建的,并且它使用被每64 MB的块更新文件长度的事实。
所以,我创建了下面的类来创建读者和我包裹着我自己的Hadoop的FS,而我重写获取长度方法返回文件长度,而不是:
public class SequenceFileUtil {
public SequenceFile.Reader createReader(Configuration conf, Path path) throws IOException {
WrappedFileSystem fileSystem = new WrappedFileSystem(FileSystem.get(conf));
return new SequenceFile.Reader(fileSystem, path, conf);
}
private class WrappedFileSystem extends FileSystem
{
private final FileSystem nestedFs;
public WrappedFileSystem(FileSystem fs){
this.nestedFs = fs;
}
@Override
public URI getUri() {
return nestedFs.getUri();
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return nestedFs.open(f,bufferSize);
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
return nestedFs.create(f, permission,overwrite,bufferSize, replication, blockSize, progress);
}
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
return nestedFs.append(f, bufferSize, progress);
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
return nestedFs.rename(src, dst);
}
@Override
public boolean delete(Path path) throws IOException {
return nestedFs.delete(path);
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
return nestedFs.delete(f, recursive);
}
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
return nestedFs.listStatus(f);
}
@Override
public void setWorkingDirectory(Path new_dir) {
nestedFs.setWorkingDirectory(new_dir);
}
@Override
public Path getWorkingDirectory() {
return nestedFs.getWorkingDirectory();
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
return nestedFs.mkdirs(f, permission);
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
return nestedFs.getFileStatus(f);
}
@Override
public long getLength(Path f) throws IOException {
DFSClient.DFSInputStream open = new DFSClient(nestedFs.getConf()).open(f.toUri().getPath());
long fileLength = open.getFileLength();
long length = nestedFs.getLength(f);
if (length < fileLength){
//We might have uncompleted blocks
return fileLength;
}
return length;
}
}
}
我遇到了类似的问题,这是我如何固定它: http://mail-archives.apache.org/mod_mbox/hadoop-common-user/201303.mbox/%3CCALtSBbY+LX6fiKutGsybS5oLXxZbVuN0WvW_a5JbExY98hJfig@mail.gmail.com%3E