I have file where each tuple span multiple lines, for example:
START
name: Jim
phone: 2128789283
address: 56 2nd street, New York, USA
END
START
name: Tom
phone: 6308789283
address: 56 5th street, Chicago, 13611, USA
END
.
.
.
So above are 2 tuples in my file. I wrote my UDF that defined a getNext()
function which check if it is START then I will initialize my tuple; if it is END then I will return the tuple (from string buffer); otherwise I will just add the string to string buffer.
It works well for file size is less than HDFS block size which is 64 MB (on Amazon EMR), whereas it will fail for the size larger than this. I try to google around, find this blog post. Raja's explaination is easy to understand and he provided a sample code. But the code is implementing the RecordReader
part, instead of getNext()
for pig LoadFunc
. Just wondering if anyone has experience to handle multi-lined pig tuple split problem? Should I go ahead implement RecordReader
in Pig? If so, how?
Thanks.
You may preprocess your input as Guy mentioned or can apply other tricks described here.
I think the cleanest solution would be to implement a custom InputFormat (along with its RecordReader) which creates one record/START-END. The Pig's LoadFunc sits on the top of the Hadoop's InputFormat, so you can define which InputFormat your LoadFunc will use.
A raw, skeleton implementation of a custom LoadFunc would look like:
import java.io.IOException;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
public class CustomLoader extends LoadFunc {
private RecordReader reader;
private TupleFactory tupleFactory;
public CustomLoader() {
tupleFactory = TupleFactory.getInstance();
}
@Override
public InputFormat getInputFormat() throws IOException {
return new MyInputFormat(); //custom InputFormat
}
@Override
public Tuple getNext() {
Tuple result = null;
try {
if (!reader.nextKeyValue()) {
return null;
}
//value can be a custom Writable containing your name/value
//field pairs for a given record
Object value = reader.getCurrentValue();
result = tupleFactory.newTuple();
// ...
//append fields to tuple
}
catch (Exception e) {
// ...
}
return result;
}
@Override
public void prepareToRead(RecordReader reader, PigSplit pigSplit)
throws IOException {
this.reader = reader;
}
@Override
public void setLocation(String location, Job job) throws IOException {
FileInputFormat.setInputPaths(job, location);
}
}
After the LoadFunc
initializes the InputFormat
and its RecordReader
, it locates the input location of your data and begins to obtain the records from recordReader, creates the resulting tuples (getNext()) until the input has been fully read.
Some remarks on the custom InputFormat:
I'd create a custom InputFormat in which the RecordReader is a modified version of
org.apache.hadoop.mapreduce.lib.input.LineRecordReader
: Most of the methods would
remain the same, except initialize()
: it would call a custom LineReader
(based on org.apache.hadoop.util.LineReader
).
The InputFormat's key would be the line offset (Long), the value would be a custom
Writable. This would hold the fields of a record (i.e data between START-END) as a list of key-value pairs. Each time your RecordReader's nextKeyValue()
is called the record is written to the custom Writable by the LineReader. The gist of the whole thing is how you
implement LineReader.readLine()
.
Another, probably an easier approach would be to change the delimiter of TextInputFormat (It is configurable in Hadoop 0.23, see textinputformat.record.delimiter
)
to one that is appropriate for your data structure (if it is possible). In this case you'll end up having your data in Text
from which you need to split and extract KV pairs and into tuples.
If can take start as your delimiter, probably below code works without UDF
SET textinputformat.record.delimiter 'START';
a = load '<input path>' as (data:chararray);
dump a;
the output would look like:
(
name: Jim
enter code here`phone: 2128789283
address: 56 2nd street, New York, USA
END
)
(
name: Tom
phone: 6308789283
address: 56 5th street, Chicago, 13611, USA
END
)
Now both are separated into two tuples.