Writing to HBase in MapReduce using MultipleOutput

2019-05-07 04:04发布

I currently have a MapReduce job that uses MultipleOutputs to send data to several HDFS locations. After that completes, I am using HBase client calls (outside of MR) to add some of the same elements to a few HBase tables. It would be nice to add the HBase outputs as just additional MultipleOutputs, using TableOutputFormat. In that way, I would distribute my HBase processing.

Problem is, I cannot get this to work. Has anyone ever used TableOutputFormat in MultipleOutputs...? With multiple HBase outputs?

basically, I am setting up my collectors, like this....

Outputcollector<ImmutableBytesWritable, Writable> hbaseCollector1 = multipleOutputs.getCollector("hbase1", reporter); 
Outputcollector<ImmutableBytesWritable, Writable> hbaseCollector2 = multipleOutputs.getCollector("hbase2", reporter); 
Put put = new Put(mykey.getBytes());
put.add("family".getBytes(), "column".getBytes(), somedata1);
hbaseCollector1.collect(NullWritable.get(), put);

put = new Put(mykey.getBytes());
put.add("family".getBytes(), "column".getBytes(), somedata2);
hbaseCollector2.collect(newImmutableBytesWritable(mykey.getBytes()), put);

This seems to follow the general idea of hbase writing, I think.

Part of the issue, as I type this, might be more in the job definition. Looks like MR (and Hbase) want a global parameter set, like this....

conf.set(TableOutputFormat.OUTPUT_TABLE, "articles");

to provide the table name. Trouble is, I have two tables....

Any ideas?

Thanks

3条回答
beautiful°
2楼-- · 2019-05-07 04:21

I've put data into HBase 3 different ways. The most efficient (and distributed) was using the HFileOutputFormat class.

I set up the job as follows... (Please note this is edited from actual code, but core content remains)

cubeBuilderETLJob.setJobName(jobName);
cubeBuilderETLJob.setMapOutputKeyClass(ImmutableBytesWritable.class);
cubeBuilderETLJob.setMapOutputValueClass(Put.class);
cubeBuilderETLJob.setMapperClass(HiveToHBaseMapper.class);      
cubeBuilderETLJob.setJarByClass(CubeBuilderDriver.class);       
cubeBuilderETLJob.setInputFormatClass(TextInputFormat.class);
cubeBuilderETLJob.setOutputFormatClass(HFileOutputFormat.class);
HFileOutputFormat.setOutputPath(cubeBuilderETLJob, cubeOutputPath);
HTable hTable = null;
Configuration hConf = HBaseConfiguration.create(conf);
hConf.set("ZOOKEEPER_QUORUM", hbaseZookeeperQuorum);
hConf.set("ZOOKEEPER_CLIENTPORT", hbaseZookeeperClientPort);
hTable = new HTable(hConf, tableName);
HFileOutputFormat.configureIncrementalLoad(cubeBuilderETLJob, hTable);

As we can see, my Mapper class is called HiveToHBaseMapper - Nice and original. :) Here's the (again, rough) definition of it

public class HiveToHBaseMapper extends
    Mapper<WritableComparable, Writable, ImmutableBytesWritable, Put> {
@Override
public void map(WritableComparable key, Writable val, Context context)
    throws IOException, InterruptedException {
    Configuration config = context.getConfiguration();
    String family = config.get("FAMILY");
    Double value = Double.parseDouble(sValue);
    String sKey = generateKey(config);
    byte[] bKey = Bytes.toBytes(sKey);
    Put put = new Put(bKey);
    put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) 
        ? Bytes.toBytes(Double.MIN_VALUE)
        : Bytes.toBytes(value));        
    ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
    context.write(ibKey, put);
}

I don't know if you can use this to fit it into the MultipleOutputs or need to create a new MR job. This is the best way I've come across to get data into HBase. :)

This will hopefully get you in the right direction to finding a solution.

查看更多
地球回转人心会变
3楼-- · 2019-05-07 04:26

In my experience, the best approach is to just put the data into the hbase table as soon as you have it available (unless you are bulk loading data). If you have the data available in your map task, that is the best time to push it to hbase. If you don't have the data until a reduce task, then add the push to hbase there. Until you know that HBase is your bottleneck, let HBase worry about the caching issues.

查看更多
我只想做你的唯一
4楼-- · 2019-05-07 04:32

So, apparently, this not possible with the old mapred packages. There is a new OutputFormat in the mapreduce package set, but I don't want to convert to that right now. So, I will have to write multiple MR jobs.

查看更多
登录 后发表回答