HBase bulk delete as “complete bulk load”

2019-05-10 02:00发布

I would like to delete 300 millions of rows in a HBase table. I could use the HBase API and send batch of Delete objects. But I am afraid that it takes lots of time.

It was the case for a previous code where I wanted to insert millions of rows. Instead of using the HBase API and send batch of Puts, I used a Map Reduce job which emits RowKey / Put as values and use the HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator) to set my Reducer so that it writes directly the output ready to be fast loaded by LoadIncrementalHFiles (complete bulk load). It was much much quicker (5 minutes instead of 3 hours).

So I wanted to do the same to bulk delete.

However, it seems that I cannot use this technique with Delete as HFileOutputFormat2 tries to configure Reducer for KeyValue or Put (PutSortReducer) but nothing exists for Delete.

My 1st question is why is there not a "DeleteSortReducer" to enable the complete bulk load technique for Delete ? Is it just something missing, which has not been done ? Or is there a deeper reason that justifies that ?

Second question, which is kind of related : if I copy/paste the code of PutSortReducer, adapt it for Delete and pass it as my job's Reducer, is it going to work ? Is HBase complete bulk load going to produce HFiles full of tombstones ?

Example :

public class DeleteSortReducer extends
        Reducer<ImmutableBytesWritable, Delete, ImmutableBytesWritable, KeyValue> {

    @Override
    protected void reduce(
            ImmutableBytesWritable row,
            java.lang.Iterable<Delete> deletes,
            Reducer<ImmutableBytesWritable, Delete,
                    ImmutableBytesWritable, KeyValue>.Context context)
            throws java.io.IOException, InterruptedException
    {
        // although reduce() is called per-row, handle pathological case
        long threshold = context.getConfiguration().getLong(
                "putsortreducer.row.threshold", 1L * (1<<30));
        Iterator<Delete> iter = deletes.iterator();
        while (iter.hasNext()) {
            TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
            long curSize = 0;
            // stop at the end or the RAM threshold
            while (iter.hasNext() && curSize < threshold) {
                Delete d = iter.next();
                for (List<Cell> cells: d.getFamilyCellMap().values()) {
                    for (Cell cell: cells) {
                        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
                        map.add(kv);
                        curSize += kv.heapSize();
                    }
                }
            }
            context.setStatus("Read " + map.size() + " entries of " + map.getClass()
                    + "(" + StringUtils.humanReadableInt(curSize) + ")");
            int index = 0;
            for (KeyValue kv : map) {
                context.write(row, kv);
                if (++index % 100 == 0)
                    context.setStatus("Wrote " + index);
            }

            // if we have more entries to process
            if (iter.hasNext()) {
                // force flush because we cannot guarantee intra-row sorted order
                context.write(null, null);
            }
        }
    }
}

标签: hbase
1条回答
Emotional °昔
2楼-- · 2019-05-10 03:01

First of all, a few words how delete operation is working in HBase. On delete command, HBase marks data as deleted and writes information about it to the HFile. Actually, data is not deleted from the disc, and two records are present in the storage: data and the deletion mark. Only after compaction, data will be deleted from disc storage.

All this information is represented as KeyValue. For KeyValue for data representing has KeyValue.Type equal to Put. For deletion mark KeyValue.Type is set one of the following values Delete, DeleteColumn, DeleteFamily, DeleteFamilyVersion.

In your case, you can achieve a bulk deletion by creating the KeyValue with special value for KeyValue.Type. For example, if you want to delete the only one column, you should create a KeyValue, using constructor

KeyValue(byte[] row, byte[] family, byte[] qualifier, long timestamp, KeyValue.Type type)

// example 

KeyValue kv = new KeyValue(row, family, qualifier, time, KeyValue.Type.DeleteColumn)

The answer to the first question you don't need a special DeleteSortReducer, you should configure a reducer for KeyValue. For second question answer is no.

查看更多
登录 后发表回答