Writing to HBase in MapReduce using MultipleOutput

2019-05-07 04:13发布

问题:

I currently have a MapReduce job that uses MultipleOutputs to send data to several HDFS locations. After that completes, I am using HBase client calls (outside of MR) to add some of the same elements to a few HBase tables. It would be nice to add the HBase outputs as just additional MultipleOutputs, using TableOutputFormat. In that way, I would distribute my HBase processing.

Problem is, I cannot get this to work. Has anyone ever used TableOutputFormat in MultipleOutputs...? With multiple HBase outputs?

basically, I am setting up my collectors, like this....

Outputcollector<ImmutableBytesWritable, Writable> hbaseCollector1 = multipleOutputs.getCollector("hbase1", reporter); 
Outputcollector<ImmutableBytesWritable, Writable> hbaseCollector2 = multipleOutputs.getCollector("hbase2", reporter); 
Put put = new Put(mykey.getBytes());
put.add("family".getBytes(), "column".getBytes(), somedata1);
hbaseCollector1.collect(NullWritable.get(), put);

put = new Put(mykey.getBytes());
put.add("family".getBytes(), "column".getBytes(), somedata2);
hbaseCollector2.collect(newImmutableBytesWritable(mykey.getBytes()), put);

This seems to follow the general idea of hbase writing, I think.

Part of the issue, as I type this, might be more in the job definition. Looks like MR (and Hbase) want a global parameter set, like this....

conf.set(TableOutputFormat.OUTPUT_TABLE, "articles");

to provide the table name. Trouble is, I have two tables....

Any ideas?

Thanks

回答1:

I've put data into HBase 3 different ways. The most efficient (and distributed) was using the HFileOutputFormat class.

I set up the job as follows... (Please note this is edited from actual code, but core content remains)

cubeBuilderETLJob.setJobName(jobName);
cubeBuilderETLJob.setMapOutputKeyClass(ImmutableBytesWritable.class);
cubeBuilderETLJob.setMapOutputValueClass(Put.class);
cubeBuilderETLJob.setMapperClass(HiveToHBaseMapper.class);      
cubeBuilderETLJob.setJarByClass(CubeBuilderDriver.class);       
cubeBuilderETLJob.setInputFormatClass(TextInputFormat.class);
cubeBuilderETLJob.setOutputFormatClass(HFileOutputFormat.class);
HFileOutputFormat.setOutputPath(cubeBuilderETLJob, cubeOutputPath);
HTable hTable = null;
Configuration hConf = HBaseConfiguration.create(conf);
hConf.set("ZOOKEEPER_QUORUM", hbaseZookeeperQuorum);
hConf.set("ZOOKEEPER_CLIENTPORT", hbaseZookeeperClientPort);
hTable = new HTable(hConf, tableName);
HFileOutputFormat.configureIncrementalLoad(cubeBuilderETLJob, hTable);

As we can see, my Mapper class is called HiveToHBaseMapper - Nice and original. :) Here's the (again, rough) definition of it

public class HiveToHBaseMapper extends
    Mapper<WritableComparable, Writable, ImmutableBytesWritable, Put> {
@Override
public void map(WritableComparable key, Writable val, Context context)
    throws IOException, InterruptedException {
    Configuration config = context.getConfiguration();
    String family = config.get("FAMILY");
    Double value = Double.parseDouble(sValue);
    String sKey = generateKey(config);
    byte[] bKey = Bytes.toBytes(sKey);
    Put put = new Put(bKey);
    put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) 
        ? Bytes.toBytes(Double.MIN_VALUE)
        : Bytes.toBytes(value));        
    ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
    context.write(ibKey, put);
}

I don't know if you can use this to fit it into the MultipleOutputs or need to create a new MR job. This is the best way I've come across to get data into HBase. :)

This will hopefully get you in the right direction to finding a solution.



回答2:

In my experience, the best approach is to just put the data into the hbase table as soon as you have it available (unless you are bulk loading data). If you have the data available in your map task, that is the best time to push it to hbase. If you don't have the data until a reduce task, then add the push to hbase there. Until you know that HBase is your bottleneck, let HBase worry about the caching issues.



回答3:

So, apparently, this not possible with the old mapred packages. There is a new OutputFormat in the mapreduce package set, but I don't want to convert to that right now. So, I will have to write multiple MR jobs.