I would like to delete 300 millions of rows in a HBase table. I could use the HBase API and send batch of Delete objects. But I am afraid that it takes lots of time.
It was the case for a previous code where I wanted to insert millions of rows. Instead of using the HBase API and send batch of Puts, I used a Map Reduce job which emits RowKey / Put as values and use the HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)
to set my Reducer so that it writes directly the output ready to be fast loaded by LoadIncrementalHFiles
(complete bulk load). It was much much quicker (5 minutes instead of 3 hours).
So I wanted to do the same to bulk delete.
However, it seems that I cannot use this technique with Delete as HFileOutputFormat2
tries to configure Reducer for KeyValue
or Put
(PutSortReducer) but nothing exists for Delete.
My 1st question is why is there not a "DeleteSortReducer" to enable the complete bulk load technique for Delete ? Is it just something missing, which has not been done ? Or is there a deeper reason that justifies that ?
Second question, which is kind of related : if I copy/paste the code of PutSortReducer, adapt it for Delete and pass it as my job's Reducer, is it going to work ? Is HBase complete bulk load going to produce HFiles full of tombstones ?
Example :
public class DeleteSortReducer extends
Reducer<ImmutableBytesWritable, Delete, ImmutableBytesWritable, KeyValue> {
@Override
protected void reduce(
ImmutableBytesWritable row,
java.lang.Iterable<Delete> deletes,
Reducer<ImmutableBytesWritable, Delete,
ImmutableBytesWritable, KeyValue>.Context context)
throws java.io.IOException, InterruptedException
{
// although reduce() is called per-row, handle pathological case
long threshold = context.getConfiguration().getLong(
"putsortreducer.row.threshold", 1L * (1<<30));
Iterator<Delete> iter = deletes.iterator();
while (iter.hasNext()) {
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
long curSize = 0;
// stop at the end or the RAM threshold
while (iter.hasNext() && curSize < threshold) {
Delete d = iter.next();
for (List<Cell> cells: d.getFamilyCellMap().values()) {
for (Cell cell: cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
map.add(kv);
curSize += kv.heapSize();
}
}
}
context.setStatus("Read " + map.size() + " entries of " + map.getClass()
+ "(" + StringUtils.humanReadableInt(curSize) + ")");
int index = 0;
for (KeyValue kv : map) {
context.write(row, kv);
if (++index % 100 == 0)
context.setStatus("Wrote " + index);
}
// if we have more entries to process
if (iter.hasNext()) {
// force flush because we cannot guarantee intra-row sorted order
context.write(null, null);
}
}
}
}
First of all, a few words how delete operation is working in HBase. On delete command, HBase marks data as deleted and writes information about it to the HFile. Actually, data is not deleted from the disc, and two records are present in the storage: data and the deletion mark. Only after compaction, data will be deleted from disc storage.
All this information is represented as
KeyValue
. For KeyValue for data representing hasKeyValue.Type
equal toPut
. For deletion mark KeyValue.Type is set one of the following valuesDelete
,DeleteColumn
,DeleteFamily
,DeleteFamilyVersion
.In your case, you can achieve a bulk deletion by creating the KeyValue with special value for
KeyValue.Type
. For example, if you want to delete the only one column, you should create aKeyValue
, using constructorThe answer to the first question you don't need a special
DeleteSortReducer
, you should configure a reducer forKeyValue
. For second question answer is no.