我有文件,其中每个元组跨越多行,例如:
START
name: Jim
phone: 2128789283
address: 56 2nd street, New York, USA
END
START
name: Tom
phone: 6308789283
address: 56 5th street, Chicago, 13611, USA
END
.
.
.
所以上面是2元组在我的文件。 我写我的UDF是定义的getNext()
如果是开始,然后我将初始化我的元组,其检查功能; 如果是END然后我将返回(从字符串缓冲区)的元组; 否则我将刚才添加的串来串缓冲区。
它可以很好地用于文件大小小于HDFS块大小是64MB(在亚马逊EMR),而它会为尺寸比这大的失败。 我尝试谷歌四周,发现这个博客帖子 。 拉贾的交代是容易理解的,他提供了一个示例代码。 但代码执行RecordReader
部分,而不是getNext()
猪LoadFunc
。 只是想知道如果任何人有经验来处理多行的猪元组分裂问题? 我应该继续实施RecordReader
猪? 如果是这样,怎么样?
谢谢。
你可以预处理你的输入作为盖伊提及或可申请描述的其他技巧在这里 。
我认为最干净的解决办法是实现一个自定义InputFormat (其RecordReader沿),它会创建一个记录/ START-END。 猪的LoadFunc坐在Hadoop的InputFormat的顶部,因此您可以定义InputFormat您LoadFunc会使用。
原始,框架实现自定义LoadFunc会是什么样子的:
import java.io.IOException;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
public class CustomLoader extends LoadFunc {
private RecordReader reader;
private TupleFactory tupleFactory;
public CustomLoader() {
tupleFactory = TupleFactory.getInstance();
}
@Override
public InputFormat getInputFormat() throws IOException {
return new MyInputFormat(); //custom InputFormat
}
@Override
public Tuple getNext() {
Tuple result = null;
try {
if (!reader.nextKeyValue()) {
return null;
}
//value can be a custom Writable containing your name/value
//field pairs for a given record
Object value = reader.getCurrentValue();
result = tupleFactory.newTuple();
// ...
//append fields to tuple
}
catch (Exception e) {
// ...
}
return result;
}
@Override
public void prepareToRead(RecordReader reader, PigSplit pigSplit)
throws IOException {
this.reader = reader;
}
@Override
public void setLocation(String location, Job job) throws IOException {
FileInputFormat.setInputPaths(job, location);
}
}
之后, LoadFunc
初始化InputFormat
及其RecordReader
,它将找到数据的输入位置,并开始获得recordReader的记录,创造所产生的元组(GETNEXT()),直到输入已充分阅读。
在自定义InputFormat一些言论:
我想创建一个自定义的InputFormat其中RecordReader是修改后的版本org.apache.hadoop.mapreduce.lib.input.LineRecordReader
:大多数的方法将保持不变,除了initialize()
它会调用自定义LineReader (基于org.apache.hadoop.util.LineReader
)。 该InputFormat的关键将是行偏移(龙),该值将是一个自定义的可写 。 这将举行一个记录的字段(即START-END之间的数据)键 - 值对的列表。 每当你RecordReader的nextKeyValue()
被调用的记录写入到自定义可写由LineReader。 整个事情的要点是如何实现 LineReader.readLine()
另外,可能是一个更简单的方法是改变的TextInputFormat的分隔符(这是在Hadoop的0.23配置,请参阅textinputformat.record.delimiter
)到一个适合于你的数据结构(如果可能)。 在这种情况下,你最终会在有数据Text
,从中您需要拆分并提取KV对和成元组。
如果可以开始为你的分隔符,也许下面的代码工作没有UDF
SET textinputformat.record.delimiter 'START';
a = load '<input path>' as (data:chararray);
dump a;
输出将如下所示:
(
name: Jim
enter code here`phone: 2128789283
address: 56 2nd street, New York, USA
END
)
(
name: Tom
phone: 6308789283
address: 56 5th street, Chicago, 13611, USA
END
)
现在,这两个被分成两个元。