MRUnit passing values in hbase Result object

2019-06-12 07:02发布

I am testing my mapper with MRUnit. I am passing key and list of values as input to the mapper from the test class. The problem is :

String key=1234_abc;
ArrayList<KeyValue> list = new ArrayList<KeyValue>();
KeyValue k1 = new KeyValue(Bytes.toBytes(key),"cf".getBytes(), "Val1".getBytes(),Bytes.toBytes("abc.com"));
KeyValue k2 = new KeyValue(Bytes.toBytes(key), "cf".getBytes(), "Val2".getBytes(),Bytes.toBytes("165"));
Result result = new Result(list);
mapDriver.withInput(key, result); 

The problem is while in the result object only the first keyvalue is retained. The others are getting stored as null.

2条回答
别忘想泡老子
2楼-- · 2019-06-12 07:45

The problem is HBase stores columns in a lexicographic order. It looks like the Result(KeyValue[] kvs) or Result(List kvs) constructor expects in the same order.

Here is the solution!

TreeSet<KeyValue> set = new TreeSet<KeyValue>(KeyValue.COMPARATOR);

byte[] row = Bytes.toBytes("row01");
byte[] cf = Bytes.toBytes("cf");
set.add(new KeyValue(row, cf, "cone".getBytes(), Bytes.toBytes("row01_cone_one")));
set.add(new KeyValue(row, cf, "ctwo".getBytes(), Bytes.toBytes("row01_ctwo_two")));
set.add(new KeyValue(row, cf, "cthree".getBytes(), Bytes.toBytes("row01_cthree_three")));
set.add(new KeyValue(row, cf, "cfour".getBytes(), Bytes.toBytes("row01_cfour_four")));
set.add(new KeyValue(row, cf, "cfive".getBytes(), Bytes.toBytes("row01_cfive_five")));
set.add(new KeyValue(row, cf, "csix".getBytes(), Bytes.toBytes("row01_csix_six")));

KeyValue[] kvs = new KeyValue[set.size()];
set.toArray(kvs);

Result result = new Result(kvs);
mapDriver.withInput(key, result);

Hope this will help!

查看更多
家丑人穷心不美
3楼-- · 2019-06-12 07:48

I just finished about 6 hours of pain on this issue myself and finally discovered the problem. It appears to be a bug in the org.apache.hadoop.hbase.client.Result class, at least for the version of HBase I am using (0.94.18).

// The below line of code was failing for me when running locally under MRUnit
// but it seemed to succeed when running in production on my cluster.
// org.apache.hadoop.hbase.client.Result result passed in to this method.
Bytes.toString(result.getValue(Constants.CF1, Constants.REG_STATUS_FLAG_BYTES));

result.getValue() calls getColumnLatest() which contains a call to binarySearch(). The binarySearch() method seems to be faulty and returns the wrong index almost always. getColumnLatest() doublechecks that it really did find the right KeyValue by making sure the family and qualifier were a match. They usually are not a matchand it returns null.

I ended up re-implementing the getValue() method and the 3 methods it uses and then swap over to the functionally correct implementation in my unit test. There may be a better way to achieve this, but it is late and this is what I came up with (and it does fix the problem):

// Usage: Pass the Result into the newly created getValue() method, rather than
// calling getValue() on the Result object.
Bytes.toString(getValue(result, Constants.CF1, Constants.REG_STATUS_FLAG_BYTES));

// Reimplemented Methods:
private byte[] getValue(Result result, byte [] family, byte [] qualifier) {
  KeyValue kv = getColumnLatest(result, family, qualifier);
  if (kv == null) {
    return null;
  }
  return kv.getValue();
}

private KeyValue getColumnLatest(Result result,  byte[] family, byte[] qualifier) {    
  KeyValue [] kvs = result.raw(); // side effect possibly.
  if (kvs == null || kvs.length == 0) {
    return null;
  }
  //int pos = binarySearch(kvs, family, qualifier);
  int pos = linearSearch(kvs, family, qualifier);
  if (pos == -1) {
    return null;
  }
  KeyValue kv = kvs[pos];
  if (kv.matchingColumn(family, qualifier)) {
    return kv;
  }
  return null;
}

private int linearSearch(final KeyValue [] kvs, final byte [] family,
  final byte [] qualifier) {

  int pos = -1;
  int index = 0;
  for (KeyValue kv : kvs) {
    if (byteArraysEqual(family, kv.getFamily()) && byteArraysEqual(qualifier, kv.getQualifier())) {
      pos = index;
      break;
    }
    index++;
  }
  return pos;
}

private boolean byteArraysEqual(final byte[] ba1, final byte[] ba2) {    
  if (ba1 == null || ba2 == null) {
    return false;
  }

  if (ba1.length != ba2.length) {
    return false;
  }

  for (int i = 0; i < ba1.length; i++) {
    if (ba1[i] != ba2[i]) {
      return false;
    }
  }

  return true;
}
查看更多
登录 后发表回答