I have a Plain text file with possibly millions of lines which needs custom parsing and I want to load it into an HBase table as fast as possible (using Hadoop or HBase Java client).
My current solution is based on a MapReduce job without the Reduce part. I use FileInputFormat
to read the text file so that each line is passed to the map
method of my Mapper
class. At this point the line is parsed to form a Put
object which is written to the context
. Then, TableOutputFormat
takes the Put
object and inserts it to table.
This solution yields an average insertion rate of 1,000 rows per second, which is less than what I expected. My HBase setup is in pseudo distributed mode on a single server.
One interesting thing is that during insertion of 1,000,000 rows, 25 Mappers (tasks) are spawned but they run serially (one after another); is this normal?
Here is the code for my current solution:
public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
protected void map(LongWritable key, Text value, Context context) throws IOException {
Map<String, String> parsedLine = parseLine(value.toString());
Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1])));
for (String currentKey : parsedLine.keySet()) {
row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey)));
}
try {
context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
return -1;
}
conf.set("hbase.mapred.outputtable", args[1]);
// I got these conf parameters from a presentation about Bulk Load
conf.set("hbase.hstore.blockingStoreFiles", "25");
conf.set("hbase.hregion.memstore.block.multiplier", "8");
conf.set("hbase.regionserver.handler.count", "30");
conf.set("hbase.regions.percheckin", "30");
conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3");
conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15");
Job job = new Job(conf);
job.setJarByClass(BulkLoadMapReduce.class);
job.setJobName(NAME);
TextInputFormat.setInputPaths(job, new Path(args[0]));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(CustomMap.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(TableOutputFormat.class);
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
Long startTime = Calendar.getInstance().getTimeInMillis();
System.out.println("Start time : " + startTime);
int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args);
Long endTime = Calendar.getInstance().getTimeInMillis();
System.out.println("End time : " + endTime);
System.out.println("Duration milliseconds: " + (endTime-startTime));
System.exit(errCode);
}
I've gone through a process that is probably very similar to yours of attempting to find an efficient way to load data from an MR into HBase. What I found to work is using
HFileOutputFormat
as the OutputFormatClass of the MR.Below is the basis of my code that I have to generate the
job
and the Mappermap
function which writes out the data. This was fast. We don't use it anymore, so I don't have numbers on hand, but it was around 2.5 million records in under a minute.Here is the (stripped down) function I wrote to generate the job for my MapReduce process to put data into HBase
This is my map function from the
HiveToHBaseMapper
class (slightly edited ).I pretty sure this isn't going to be a Copy&Paste solution for you. Obviously the data I was working with here didn't need any custom processing (that was done in a MR job before this one). The main thing I want to provide out of this is the
HFileOutputFormat
. The rest is just an example of how I used it. :)I hope it gets you onto a solid path to a good solution. :
mapreduce.tasktracker.map.tasks.maximum
parameter which is defaulted to 2 determines the maximum number of tasks that can run in parallel on a node. Unless changed, you should see 2 map tasks running simultaneously on each node.