I would like to delete 300 millions of rows in a HBase table. I could use the HBase API and send batch of Delete objects. But I am afraid that it takes lots of time.
It was the case for a previous code where I wanted to insert millions of rows. Instead of using the HBase API and send batch of Puts, I used a Map Reduce job which emits RowKey / Put as values and use the HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)
to set my Reducer so that it writes directly the output ready to be fast loaded by LoadIncrementalHFiles
(complete bulk load). It was much much quicker (5 minutes instead of 3 hours).
So I wanted to do the same to bulk delete.
However, it seems that I cannot use this technique with Delete as HFileOutputFormat2
tries to configure Reducer for KeyValue
or Put
(PutSortReducer) but nothing exists for Delete.
My 1st question is why is there not a "DeleteSortReducer" to enable the complete bulk load technique for Delete ? Is it just something missing, which has not been done ? Or is there a deeper reason that justifies that ?
Second question, which is kind of related : if I copy/paste the code of PutSortReducer, adapt it for Delete and pass it as my job's Reducer, is it going to work ? Is HBase complete bulk load going to produce HFiles full of tombstones ?
Example :
public class DeleteSortReducer extends
Reducer<ImmutableBytesWritable, Delete, ImmutableBytesWritable, KeyValue> {
@Override
protected void reduce(
ImmutableBytesWritable row,
java.lang.Iterable<Delete> deletes,
Reducer<ImmutableBytesWritable, Delete,
ImmutableBytesWritable, KeyValue>.Context context)
throws java.io.IOException, InterruptedException
{
// although reduce() is called per-row, handle pathological case
long threshold = context.getConfiguration().getLong(
"putsortreducer.row.threshold", 1L * (1<<30));
Iterator<Delete> iter = deletes.iterator();
while (iter.hasNext()) {
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
long curSize = 0;
// stop at the end or the RAM threshold
while (iter.hasNext() && curSize < threshold) {
Delete d = iter.next();
for (List<Cell> cells: d.getFamilyCellMap().values()) {
for (Cell cell: cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
map.add(kv);
curSize += kv.heapSize();
}
}
}
context.setStatus("Read " + map.size() + " entries of " + map.getClass()
+ "(" + StringUtils.humanReadableInt(curSize) + ")");
int index = 0;
for (KeyValue kv : map) {
context.write(row, kv);
if (++index % 100 == 0)
context.setStatus("Wrote " + index);
}
// if we have more entries to process
if (iter.hasNext()) {
// force flush because we cannot guarantee intra-row sorted order
context.write(null, null);
}
}
}
}