我在考虑使用HBase的为我的MapReduce工作的一个来源。 我知道TableInputFormat指定每个地区的一个输入分裂(因此一个映射器)。 然而,这似乎效率不高。 我真的很想有多个映射器在给定的区域合作一次。 我可以通过扩展TableInputFormatBase实现这一目标? 能否请你点我一个例子吗? 此外,这是甚至是一个好主意?
谢谢您的帮助。
我在考虑使用HBase的为我的MapReduce工作的一个来源。 我知道TableInputFormat指定每个地区的一个输入分裂(因此一个映射器)。 然而,这似乎效率不高。 我真的很想有多个映射器在给定的区域合作一次。 我可以通过扩展TableInputFormatBase实现这一目标? 能否请你点我一个例子吗? 此外,这是甚至是一个好主意?
谢谢您的帮助。
你需要扩展InputFormat自定义输入格式。 你可以知道如何做到这一点,从回答的问题我要扫描大量的数据(基于范围查询),什么都优化,我可以一边写数据,使扫描变得更快做的 。 这是一个好主意,如果数据处理时间更大于数据检索时间。
不知道你可以为一个给定的区域指定多个映射器,但考虑以下几点:
如果你觉得一个映射为每个区域的低效(也许你的数据节点没有足够的资源,如CPU数目),您可以或许指定文件HBase的-site.xml中较小的区域的大小。
这里有一个网站的默认CONFIGS选项,如果你想寻找到改变这种: http://hbase.apache.org/configuration.html#hbase_default_configurations
请注意,以使该地区面积不大,你会增加文件的数量在你的DFS,这可以限制你的Hadoop DFS的取决于你的NameNode的内存容量。 请记住,名称节点的内存使用情况,直接关系到您的DFS文件的数量。 这可能会或可能不会初步认识到你的情况,我不知道是如何被使用的集群做。 从未有一个银弹回答这些问题!
1。 它的精绝只要确保按键是映射器之间的相互排斥。
使用此MultipleScanTableInputFormat,您可以使用MultipleScanTableInputFormat.PARTITIONS_PER_REGION_SERVER配置来控制许多映射器应该如何执行对一个RegionServer的。 本课程将组中的所有输入由它们的位置(RegionServer的)分裂,并RecordReader将通过对映射器对所有分裂正常循环。
这里是例子
https://gist.github.com/bbeaudreault/9788499#file-multiplescantableinputformat-java-L90
这项工作已创建的多个聚合拆分为单一映射器
private List<InputSplit> getAggregatedSplits(JobContext context) throws IOException {
final List<InputSplit> aggregatedSplits = new ArrayList<InputSplit>();
final Scan scan = getScan();
for (int i = 0; i < startRows.size(); i++) {
scan.setStartRow(startRows.get(i));
scan.setStopRow(stopRows.get(i));
setScan(scan);
aggregatedSplits.addAll(super.getSplits(context));
}
// set the state back to where it was..
scan.setStopRow(null);
scan.setStartRow(null);
setScan(scan);
return aggregatedSplits;
}
各地区服务器上创建分区
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
List<InputSplit> source = getAggregatedSplits(context);
if (!partitionByRegionServer) {
return source;
}
// Partition by regionserver
Multimap<String, TableSplit> partitioned = ArrayListMultimap.<String, TableSplit>create();
for (InputSplit split : source) {
TableSplit cast = (TableSplit) split;
String rs = cast.getRegionLocation();
partitioned.put(rs, cast);
}
如果你想扫描大的区域(数百万行的百)与调节扫描是仅找到几个记录这将是有益的。 这将防止ScannerTimeoutException的
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
public class RegionSplitTableInputFormat extends TableInputFormat {
public static final String REGION_SPLIT = "region.split";
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
Configuration conf = context.getConfiguration();
int regionSplitCount = conf.getInt(REGION_SPLIT, 0);
List<InputSplit> superSplits = super.getSplits(context);
if (regionSplitCount <= 0) {
return superSplits;
}
List<InputSplit> splits = new ArrayList<InputSplit>(superSplits.size() * regionSplitCount);
for (InputSplit inputSplit : superSplits) {
TableSplit tableSplit = (TableSplit) inputSplit;
System.out.println("splitting by " + regionSplitCount + " " + tableSplit);
byte[] startRow0 = tableSplit.getStartRow();
byte[] endRow0 = tableSplit.getEndRow();
boolean discardLastSplit = false;
if (endRow0.length == 0) {
endRow0 = new byte[startRow0.length];
Arrays.fill(endRow0, (byte) 255);
discardLastSplit = true;
}
byte[][] split = Bytes.split(startRow0, endRow0, regionSplitCount);
if (discardLastSplit) {
split[split.length - 1] = new byte[0];
}
for (int regionSplit = 0; regionSplit < split.length - 1; regionSplit++) {
byte[] startRow = split[regionSplit];
byte[] endRow = split[regionSplit + 1];
TableSplit newSplit = new TableSplit(tableSplit.getTableName(), startRow, endRow,
tableSplit.getLocations()[0]);
splits.add(newSplit);
}
}
return splits;
}
}