Create secondary index using coprocesor HBase

2019-03-30 18:14发布


I've been trying to write my own coprocessor that creates a secondary index using the prePut hook. To start, I've been simply trying to get a prePut coprocessor to work. So far I can have the coprocessor add to the put object passed to it. What i've found is that I cannot get the coprocessor to write to a row separate from what the passed in put object is writing to. Obviously to create a secondary index, I need to figure this one out.

Below is the code for my coprocessor, but it doesn't work.
Yes, all tables exists, and 'colfam1' exists too.
HBase Version: HBase 0.92.1-cdh4.1.2 from Cloudera's CDH4

Does anyone know what the problem is?

        public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException {          
            KeyValue kv = new KeyValue(Bytes.toBytes("COPROCESSORROW"), Bytes.toBytes("colfam1"),Bytes.toBytes("COPROCESSOR: "+System.currentTimeMillis()),Bytes.toBytes("IT WORKED"));

I get the following error:

    ERROR: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: IOException: 1 time, servers with issues:


I've modified my coprocessor to the following, but I'm still getting an error. Now the post-Put (secondary index) is written, but there is still a timeout error.
The entire table on the region crashes too requiring me to restart the region. Sometimes a region restart doesn't work and the entire region (all tables) are corrupted requiring a server rebuild.

I have no idea why...!?

      public void start(CoprocessorEnvironment env) throws IOException {       "(start)");
        pool = new HTablePool(env.getConfiguration(), 10);

    public void postPut(final ObserverContext<RegionCoprocessorEnvironment> observerContext,final Put put,final WALEdit edit,final boolean writeToWAL) throws IOException {
        byte[] tableName  = observerContext.getEnvironment().getRegion().getRegionInfo().getTableName();

        //not necessary though if you register the coprocessor for the specific table , SOURCE_TBL
        if (!Bytes.equals(tableName, Bytes.toBytes(SOURCE_TABLE))) 

        try {           
  "STARTING postPut");
            HTableInterface table = pool.getTable(Bytes.toBytes(INDEX_TABLE));
            //create row              
  "Creating new row");            
            byte [] rowkey = Bytes.toBytes("COPROCESSOR ROW");
            Put indexput  = new Put(rowkey); 
            indexput.add(Bytes.toBytes ( "data"),  Bytes.toBytes("CP: "+System.currentTimeMillis()),  Bytes.toBytes("IT WORKED!"));
  "Writing to table");
  "flushing commits");            
  "close table");

        } catch ( IllegalArgumentException ex) {

            //handle excepion.


      public void stop(CoprocessorEnvironment env) throws IOException {"(stop)");

Here is the region server log: (note my logging comments)

2013-01-30 19:30:39,754 INFO my.package.MyCoprocessor: STARTING postPut
2013-01-30 19:30:39,754 INFO my.package.MyCoprocessor: TURN OFF AUTOFLUSH
2013-01-30 19:30:39,755 INFO my.package.MyCoprocessor: Creating new row
2013-01-30 19:30:39,755 INFO my.package.MyCoprocessor: Writing to table
2013-01-30 19:30:39,755 INFO my.package.MyCoprocessor: flushing commits
2013-01-30 19:31:39,813 WARN org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation: Failed all from region=test_table,,1359573731255.d41b77b31fafa6502a8f09db9c56b9d8., hostname=node01, port=60020
java.util.concurrent.ExecutionException: Call to node01/<private_ip>:60020 failed on socket timeout exception: 60000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/<private_ip>:56390 remote=node01/<private_ip>:60020]
    at java.util.concurrent.FutureTask$Sync.innerGet(
    at java.util.concurrent.FutureTask.get(
    at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.processBatchCallback(
    at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.processBatch(
    at org.apache.hadoop.hbase.client.HTable.flushCommits(
    at org.apache.hadoop.hbase.client.HTablePool$PooledHTable.flushCommits(
    at my.package.MyCoprocessor.postPut(
    at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postPut(
    at org.apache.hadoop.hbase.regionserver.HRegion.doMiniBatchPut(
    at org.apache.hadoop.hbase.regionserver.HRegion.put(
    at org.apache.hadoop.hbase.regionserver.HRegionServer.multi(
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(
    at java.lang.reflect.Method.invoke(
    at org.apache.hadoop.hbase.ipc.WritableRpcEngine$
    at org.apache.hadoop.hbase.ipc.HBaseServer$

Solved: I was trying to write to the same table in my coprocoessor that the coprocessor was working on: in short, when I wrote a cell, the CP wrote a cell causing the CP to trigger again and write another and so on and on. I stopped it by doing a row check b4 writting the CP row to prevent this loop.


Below is snippet of code on how we use Coprocessors in Hbase to create secondary index. Can be helpful to you.

public class TestCoprocessor extends BaseRegionObserver{

    private HTablePool pool = null;

    private final static String  INDEX_TABLE  = "INDEX_TBL";
    private final static String  SOURCE_TABLE = "SOURCE_TBL";

    public void start(CoprocessorEnvironment env) throws IOException {  
        pool = new HTablePool(env.getConfiguration(), 10);

    public void postPut(
        final ObserverContext<RegionCoprocessorEnvironment> observerContext,
        final Put put,
        final WALEdit edit,
        final boolean writeToWAL)
        throws IOException {

        byte[] table = observerContext.getEnvironment(

        // Not necessary though if you register the coprocessor
        // for the specific table, SOURCE_TBL
        if (!Bytes.equals(table, Bytes.toBytes(SOURCE_TABLE))) {

        try {
            final List<KeyValue> filteredList = put.get(
                Bytes.toBytes ( "colfam1"), Bytes.toBytes(" qaul"));
            filteredList.get( 0 ); //get the column value

            // get the values 
            HTableInterface table = pool.getTable(Bytes.toBytes(INDEX_TABLE));

            // create row key             
            byte [] rowkey = mkRowKey () //make the row key
            Put indexput = new Put(rowkey); 
                Bytes.toBytes( "colfam1"),
                Bytes.toBytes(" qaul"),
                Bytes.toBytes(" value.."));


        } catch ( IllegalArgumentException ex) {
            // handle excepion.


    public void stop(CoprocessorEnvironment env) throws IOException {


To register the above coprocessor on the SOURCE_BL, go to the hbase shell and follow the below steps

  1. disable 'SOURCE_TBL'
  2. alter 'SOURCE_TBL', METHOD => 'table_att','coprocessor'=>'file:///path/to/coprocessor.jar|TestCoprocessor|1001'
  3. enable 'SOURCE_TBL'


Secondary indexes have been built into HBase now. Look at this blog entry for the same. There is no need to use CoProcessors in HBase for the same.