在Hadoop中的MapReduce创建自定义InputFormat和RecordReader对二进

2019-07-29 07:58发布

我正在写处理写的看起来是这样的二进制格式大的时间序列数据文件M / R作业(这里可读性新的生产线,实际的数据是连续的,很明显):

TIMESTAMP_1---------------------TIMESTAMP_1
TIMESTAMP_2**********TIMESTAMP_2 
TIMESTAMP_3%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%TIMESTAMP_3
.. etc

其中时间戳是一个简单的8字节结构,识别为例如由前2个字节。 实际的数据是重复的值时间戳之间界定,如上面显示的,并包含一个或多个预定义的结构。 我想编写一个自定义InputFormat将发射键/值对映射器:

< TIMESTAMP_1, --------------------- >
< TIMESTAMP_2, ********** >
< TIMESTAMP_3, %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% >

从逻辑上讲,我想跟踪当前的TIMESTAMP ,并且直到聚集所有数据TIMESTAMP再次检测,然后把我的<TIMESTAMP, DATA>对作为记录。 我的问题是内部分割之间的同步RecordReader ,因此,如果在一定读取器接收以下分裂

# a split occurs inside my data
reader X: TIMESTAMP_1--------------
reader Y: -------TIMESTAMP_1 TIMESTAMP_2****..

# or inside the timestamp
or even: @@@@@@@TIMES
         TAMP_1-------------- ..

什么是解决这个的好办法? 难道我有一个简单的方法来访问文件偏移,使得我CustomRecordReader可以拆分之间同步和不丢失数据? 我觉得我有分裂是如何处理的一些概念上的差距,所以也许这些解释可能会有帮助。 谢谢。

Answer 1:

一般来说,不能简单地创建支持拆分输入格式,因为你应该能够找到从哪里分割边界移动到获得一致的记录。 XmlInputFormat是这样做的格式很好的例子。
我建议首先考虑,如果你确实需要裂开的投入? 你可以定义你的输入格式不裂开的,并没有这些问题。
如果你的文件一般都不会远远大于块的大小 - 你什么都不会。 如果他们这样做 - 你将失去的数据局部性的一部分。



Answer 2:

你也可以继承的具体子类FileInputFormat ,例如, SeqenceFileAsBinaryInputFormat ,并覆盖isSplitable()方法返回false

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;

public class NonSplitableBinaryFile extends SequenceFileAsBinaryInputFormat{

  @Override
  protected boolean isSplitable(FileSystem fs, Path file) {
      return false;
  }

  @Override
  public RecordReader getRecordReader(InputSplit split, JobConf job,
  Reporter reporter) throws IOException {
    //return your customized record reader here
  }
}


文章来源: Creating custom InputFormat and RecordReader for Binary Files in Hadoop MapReduce