When using HBase as a source for MapReduce, can I

2019-04-23 14:38发布

I'm thinking about using HBase as a source for one of my MapReduce jobs. I know that TableInputFormat specifies one input split (and thus one mapper) per Region. However, this seems inefficient. I'd really like to have multiple mappers working on a given Region at once. Can I achieve this by extending TableInputFormatBase? Can you please point me to an example? Furthermore, is this even a good idea?

Thanks for the help.

5条回答
甜甜的少女心
2楼-- · 2019-04-23 15:12

Not sure if you can specify multiple mappers for a given region, but consider the following:

If you think one mapper is inefficient per region (maybe your data nodes don't have enough resources like #cpus), you can perhaps specify smaller regions sizes in the file hbase-site.xml.

here's a site for the default configs options if you want to look into changing that: http://hbase.apache.org/configuration.html#hbase_default_configurations

please note that by making the region size small, you will be increasing the number of files in your DFS, and this can limit the capacity of your hadoop DFS depending on the memory of your namenode. Remember, the namenode's memory usage is directly related to the number of files in your DFS. This may or may not be relavant to your situation as I do not know how your cluster is being used. There is never a silver bullet answer to these questions!

查看更多
We Are One
3楼-- · 2019-04-23 15:26

1 . Its absolutely fine just make sure the key set is mutually exclusive between the mappers .

  1. you arent creating too many clients as this may lead to lot of gc , as during hbase read hbase block cache churning happens
查看更多
放我归山
4楼-- · 2019-04-23 15:28

Using this MultipleScanTableInputFormat, you can use MultipleScanTableInputFormat.PARTITIONS_PER_REGION_SERVER configuration to control how many mappers should execute against a single regionserver. The class will group all the input splits by their location (regionserver), and the RecordReader will properly iterate through all aggregated splits for the mapper.

Here is the example

https://gist.github.com/bbeaudreault/9788499#file-multiplescantableinputformat-java-L90

That work you have created the multiple aggregated splits for a single mapper

private List<InputSplit> getAggregatedSplits(JobContext context) throws IOException {
final List<InputSplit> aggregatedSplits = new ArrayList<InputSplit>();

final Scan scan = getScan();

for (int i = 0; i < startRows.size(); i++) {
  scan.setStartRow(startRows.get(i));
  scan.setStopRow(stopRows.get(i));

  setScan(scan);

  aggregatedSplits.addAll(super.getSplits(context));
}

// set the state back to where it was.. 
scan.setStopRow(null);
scan.setStartRow(null);

setScan(scan);

return aggregatedSplits;
 }

Create partition by Region server

 @Override
 public List<InputSplit> getSplits(JobContext context) throws IOException {
List<InputSplit> source = getAggregatedSplits(context);

if (!partitionByRegionServer) {
  return source;
}

// Partition by regionserver
Multimap<String, TableSplit> partitioned = ArrayListMultimap.<String, TableSplit>create();
for (InputSplit split : source) {
  TableSplit cast = (TableSplit) split;
  String rs = cast.getRegionLocation();

  partitioned.put(rs, cast);
}
查看更多
祖国的老花朵
5楼-- · 2019-04-23 15:32

This would be useful if you wanna scan large regions (hundred of millions rows) with conditioned scan that finds only a few records. This will prevent of ScannerTimeoutException

package org.apache.hadoop.hbase.mapreduce;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;

public class RegionSplitTableInputFormat extends TableInputFormat {

    public static final String REGION_SPLIT = "region.split";

    @Override
    public List<InputSplit> getSplits(JobContext context) throws IOException {

        Configuration conf = context.getConfiguration();
        int regionSplitCount = conf.getInt(REGION_SPLIT, 0);
        List<InputSplit> superSplits = super.getSplits(context);
        if (regionSplitCount <= 0) {
            return superSplits;
        }

        List<InputSplit> splits = new ArrayList<InputSplit>(superSplits.size() * regionSplitCount);

        for (InputSplit inputSplit : superSplits) {
            TableSplit tableSplit = (TableSplit) inputSplit;
            System.out.println("splitting by " + regionSplitCount + " " + tableSplit);
            byte[] startRow0 = tableSplit.getStartRow();
            byte[] endRow0 = tableSplit.getEndRow();
            boolean discardLastSplit = false;
            if (endRow0.length == 0) {
                endRow0 = new byte[startRow0.length];
                Arrays.fill(endRow0, (byte) 255);
                discardLastSplit = true;
            }
            byte[][] split = Bytes.split(startRow0, endRow0, regionSplitCount);
            if (discardLastSplit) {
                split[split.length - 1] = new byte[0];
            }
            for (int regionSplit = 0; regionSplit < split.length - 1; regionSplit++) {
                byte[] startRow = split[regionSplit];
                byte[] endRow = split[regionSplit + 1];
                TableSplit newSplit = new TableSplit(tableSplit.getTableName(), startRow, endRow,
                        tableSplit.getLocations()[0]);
                splits.add(newSplit);
            }
        }

        return splits;
    }
}
查看更多
时光不老,我们不散
6楼-- · 2019-04-23 15:34

You need a custom input format that extends InputFormat. you can get idea how do this from answer to question I want to scan lots of data (Range based queries), what all optimizations I can do while writing the data so that scan becomes faster. This is a good idea if the time of data processing is more greater then data retrieving time.

查看更多
登录 后发表回答