我想要在一个HBase表中删除3亿行数据。我可以使用HBase API并发送Delete对象的批处理来实现,但我担心这会花费很长时间。
之前我想要插入数百万行数据时,也是这样。但我没有使用HBase API和发送Put的批处理。相反,我使用了一个Map Reduce作业,该作业将RowKey / Put作为值发出,并使用设置了我的Reducer,以便直接编写输出以便快速加载
所以我想用同样的方法进行批量删除。
然而,似乎我不能像Put一样使用此技术来进行Delete操作,因为尝试配置KeyValue或Put的Reducer (PutSortReducer),但对于Delete则不存在。
我的第一个问题是:为什么没有“DeleteSortReducer”以启用Delete的完整批量加载技术?这只是缺少的东西,还是有更深层次的原因可以证明?
第二个问题,有点相关:如果我复制/粘贴PutSortReducer的代码,将其调整为Delete,然后将其作为我的作业Reducer传递,它会工作吗?HBase完整的批量加载是否会产生满是墓碑的HFiles?
例子:
之前我想要插入数百万行数据时,也是这样。但我没有使用HBase API和发送Put的批处理。相反,我使用了一个Map Reduce作业,该作业将RowKey / Put作为值发出,并使用设置了我的Reducer,以便直接编写输出以便快速加载
LoadIncrementalHFiles
(完整批量加载)。它比原来快得多(5分钟而不是3小时)。所以我想用同样的方法进行批量删除。
然而,似乎我不能像Put一样使用此技术来进行Delete操作,因为尝试配置KeyValue或Put的Reducer (PutSortReducer),但对于Delete则不存在。
我的第一个问题是:为什么没有“DeleteSortReducer”以启用Delete的完整批量加载技术?这只是缺少的东西,还是有更深层次的原因可以证明?
第二个问题,有点相关:如果我复制/粘贴PutSortReducer的代码,将其调整为Delete,然后将其作为我的作业Reducer传递,它会工作吗?HBase完整的批量加载是否会产生满是墓碑的HFiles?
例子:
public class DeleteSortReducer extends
Reducer<ImmutableBytesWritable, Delete, ImmutableBytesWritable, KeyValue> {
@Override
protected void reduce(
ImmutableBytesWritable row,
java.lang.Iterable<Delete> deletes,
Reducer<ImmutableBytesWritable, Delete,
ImmutableBytesWritable, KeyValue>.Context context)
throws java.io.IOException, InterruptedException
{
// although reduce() is called per-row, handle pathological case
long threshold = context.getConfiguration().getLong(
"putsortreducer.row.threshold", 1L * (1<<30));
Iterator<Delete> iter = deletes.iterator();
while (iter.hasNext()) {
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
long curSize = 0;
// stop at the end or the RAM threshold
while (iter.hasNext() && curSize < threshold) {
Delete d = iter.next();
for (List<Cell> cells: d.getFamilyCellMap().values()) {
for (Cell cell: cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
map.add(kv);
curSize += kv.heapSize();
}
}
}
context.setStatus("Read " + map.size() + " entries of " + map.getClass()
+ "(" + StringUtils.humanReadableInt(curSize) + ")");
int index = 0;
for (KeyValue kv : map) {
context.write(row, kv);
if (++index % 100 == 0)
context.setStatus("Wrote " + index);
}
// if we have more entries to process
if (iter.hasNext()) {
// force flush because we cannot guarantee intra-row sorted order
context.write(null, null);
}
}
}
}