当使用的HBase作为MapReduce的一个源,我可以延伸TableInputFormatBase

2019-07-29 22:42发布

我在考虑使用HBase的为我的MapReduce工作的一个来源。 我知道TableInputFormat指定每个地区的一个输入分裂(因此一个映射器)。 然而,这似乎效率不高。 我真的很想有多个映射器在给定的区域合作一次。 我可以通过扩展TableInputFormatBase实现这一目标? 能否请你点我一个例子吗? 此外,这是甚至是一个好主意?

谢谢您的帮助。

Answer 1:

你需要扩展InputFormat自定义输入格式。 你可以知道如何做到这一点,从回答的问题我要扫描大量的数据(基于范围查询),什么都优化,我可以一边写数据,使扫描变得更快做的 。 这是一个好主意,如果数据处理时间更大于数据检索时间。



Answer 2:

不知道你可以为一个给定的区域指定多个映射器,但考虑以下几点:

如果你觉得一个映射为每个区域的低效(也许你的数据节点没有足够的资源,如CPU数目),您可以或许指定文件HBase的-site.xml中较小的区域的大小。

这里有一个网站的默认CONFIGS选项,如果你想寻找到改变这种: http://hbase.apache.org/configuration.html#hbase_default_configurations

请注意,以使该地区面积不大,你会增加文件的数量在你的DFS,这可以限制你的Hadoop DFS的取决于你的NameNode的内存容量。 请记住,名称节点的内存使用情况,直接关系到您的DFS文件的数量。 这可能会或可能不会初步认识到你的情况,我不知道是如何被使用的集群做。 从未有一个银弹回答这些问题!



Answer 3:

1。 它的精绝只要确保按键是映射器之间的相互排斥。

  1. 你的arent创造太多的客户,因为这可能会导致大量GC,因为在HBase的HBase的读取缓存块发生搅动


Answer 4:

使用此MultipleScanTableInputFormat,您可以使用MultipleScanTableInputFormat.PARTITIONS_PER_REGION_SERVER配置来控制许多映射器应该如何执行对一个RegionServer的。 本课程将组中的所有输入由它们的位置(RegionServer的)分裂,并RecordReader将通过对映射器对所有分裂正常循环。

这里是例子

https://gist.github.com/bbeaudreault/9788499#file-multiplescantableinputformat-java-L90

这项工作已创建的多个聚合拆分为单一映射器

private List<InputSplit> getAggregatedSplits(JobContext context) throws IOException {
final List<InputSplit> aggregatedSplits = new ArrayList<InputSplit>();

final Scan scan = getScan();

for (int i = 0; i < startRows.size(); i++) {
  scan.setStartRow(startRows.get(i));
  scan.setStopRow(stopRows.get(i));

  setScan(scan);

  aggregatedSplits.addAll(super.getSplits(context));
}

// set the state back to where it was.. 
scan.setStopRow(null);
scan.setStartRow(null);

setScan(scan);

return aggregatedSplits;
 }

各地区服务器上创建分区

 @Override
 public List<InputSplit> getSplits(JobContext context) throws IOException {
List<InputSplit> source = getAggregatedSplits(context);

if (!partitionByRegionServer) {
  return source;
}

// Partition by regionserver
Multimap<String, TableSplit> partitioned = ArrayListMultimap.<String, TableSplit>create();
for (InputSplit split : source) {
  TableSplit cast = (TableSplit) split;
  String rs = cast.getRegionLocation();

  partitioned.put(rs, cast);
}


Answer 5:

如果你想扫描大的区域(数百万行的百)与调节扫描是仅找到几个记录这将是有益的。 这将防止ScannerTimeoutException的

package org.apache.hadoop.hbase.mapreduce;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;

public class RegionSplitTableInputFormat extends TableInputFormat {

    public static final String REGION_SPLIT = "region.split";

    @Override
    public List<InputSplit> getSplits(JobContext context) throws IOException {

        Configuration conf = context.getConfiguration();
        int regionSplitCount = conf.getInt(REGION_SPLIT, 0);
        List<InputSplit> superSplits = super.getSplits(context);
        if (regionSplitCount <= 0) {
            return superSplits;
        }

        List<InputSplit> splits = new ArrayList<InputSplit>(superSplits.size() * regionSplitCount);

        for (InputSplit inputSplit : superSplits) {
            TableSplit tableSplit = (TableSplit) inputSplit;
            System.out.println("splitting by " + regionSplitCount + " " + tableSplit);
            byte[] startRow0 = tableSplit.getStartRow();
            byte[] endRow0 = tableSplit.getEndRow();
            boolean discardLastSplit = false;
            if (endRow0.length == 0) {
                endRow0 = new byte[startRow0.length];
                Arrays.fill(endRow0, (byte) 255);
                discardLastSplit = true;
            }
            byte[][] split = Bytes.split(startRow0, endRow0, regionSplitCount);
            if (discardLastSplit) {
                split[split.length - 1] = new byte[0];
            }
            for (int regionSplit = 0; regionSplit < split.length - 1; regionSplit++) {
                byte[] startRow = split[regionSplit];
                byte[] endRow = split[regionSplit + 1];
                TableSplit newSplit = new TableSplit(tableSplit.getTableName(), startRow, endRow,
                        tableSplit.getLocations()[0]);
                splits.add(newSplit);
            }
        }

        return splits;
    }
}


文章来源: When using HBase as a source for MapReduce, can I extend TableInputFormatBase to create multiple splits and multiple mappers for each region?