MapReduce job with mixed data sources: HBase table

2019-03-31 08:12发布

I need to implement a MR job which access data from both HBase table and HDFS files. E.g., mapper reads data from HBase table and from HDFS files, these data share the same primary key but have different schema. A reducer then join all columns (from HBase table and HDFS files) together.

I tried look online and could not find a way to run MR job with such mixed data source. MultipleInputs seem only work for multiple HDFS data sources. Please let me know if you have some ideas. Sample code would be great.

3条回答
贪生不怕死
2楼-- · 2019-03-31 08:42

There is no OOTB feature that supports this. A possible workaround could be to Scan your HBase table and write the Results to a HDFS file first and then do the reduce-side join using MultipleInputs. But this will incur some additional I/O overhead.

查看更多
神经病院院长
3楼-- · 2019-03-31 08:42

A pig script or hive query can do that easily.

sample pig script

tbl = LOAD 'hbase://SampleTable'
       USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
       'info:* ...', '-loadKey true -limit 5')
       AS (id:bytearray, info_map:map[],...);

fle = LOAD '/somefile' USING PigStorage(',') AS (id:bytearray,...);

Joined = JOIN A tbl by id,fle by id;
STORE Joined to ...
查看更多
Evening l夕情丶
4楼-- · 2019-03-31 08:45

After a few days of investigation (and get help from HBase user mailing list), I finally figured out how to do it. Here is the source code:

public class MixMR {

public static class Map extends Mapper<Object, Text, Text, Text> {

    public void map(Object key, Text value, Context context) throws IOException,   InterruptedException {
        String s = value.toString();
        String[] sa = s.split(",");
        if (sa.length == 2) {
            context.write(new Text(sa[0]), new Text(sa[1]));
        }

    }

}

public static class TableMap extends TableMapper<Text, Text>  {
    public static final byte[] CF = "cf".getBytes();
    public static final byte[] ATTR1 = "c1".getBytes();

    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {

        String key = Bytes.toString(row.get());
        String val = new String(value.getValue(CF, ATTR1));

        context.write(new Text(key), new Text(val));
    }
}


public static class Reduce extends Reducer  <Object, Text, Object, Text> {
    public void reduce(Object key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        String ks = key.toString();
        for (Text val : values){
            context.write(new Text(ks), val);
        }

    }
}

public static void main(String[] args) throws Exception {
Path inputPath1 = new Path(args[0]);
    Path inputPath2 = new Path(args[1]);
    Path outputPath = new Path(args[2]);

    String tableName = "test";

    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config, "ExampleRead");
    job.setJarByClass(MixMR.class);     // class that contains mapper

    Scan scan = new Scan();
    scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
    scan.setCacheBlocks(false);  // don't set to true for MR jobs
    scan.addFamily(Bytes.toBytes("cf"));

    TableMapReduceUtil.initTableMapperJob(
            tableName,        // input HBase table name
              scan,             // Scan instance to control CF and attribute selection
              TableMap.class,   // mapper
              Text.class,             // mapper output key
              Text.class,             // mapper output value
              job);


    job.setReducerClass(Reduce.class);    // reducer class
    job.setOutputFormatClass(TextOutputFormat.class);   


    // inputPath1 here has no effect for HBase table
    MultipleInputs.addInputPath(job, inputPath1, TextInputFormat.class, Map.class);
    MultipleInputs.addInputPath(job, inputPath2,  TableInputFormat.class, TableMap.class);

    FileOutputFormat.setOutputPath(job, outputPath); 

    job.waitForCompletion(true);
}

}

查看更多
登录 后发表回答