使用MapReduce作业进行HBase批量删除

3
使用mapreduce作业时,我正在尝试从Hbase表中删除行。 我遇到了以下错误。
java.lang.ClassCastException: org.apache.hadoop.hbase.client.Delete cannot be cast to org.apache.hadoop.hbase.KeyValue
        at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:124)
        at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:551)
        at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:85)
        at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:99)
        at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:144)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:164)
        at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:610)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:444)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.

看起来这是由于configureIncrementalLoad设置为KeyValue输出引起的。它只有PutSortReducer和KeyValueSortReducer,但没有DeleteSortReducer。

我的代码:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DeleteRows extends Configured implements Tool {

    public static class Map extends
            Mapper<LongWritable, Text, ImmutableBytesWritable, Delete> {

        ImmutableBytesWritable hKey = new ImmutableBytesWritable();
        Delete delRow;

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            hKey.set(value.getBytes());
            delRow = new Delete(hKey.get());
            context.write(hKey, delRow);
            // Update counters
            context.getCounter("RowsDeleted", "Success").increment(1);
        }
    }


    @SuppressWarnings("deprecation")
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        args = new GenericOptionsParser(conf, args).getRemainingArgs();
        HBaseConfiguration.addHbaseResources(conf);

        Job job = new Job(conf, "Delete stuff!");
        job.setJarByClass(DeleteRows.class);

        job.setMapperClass(Map.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Delete.class);

        job.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));

        HTable hTable = new HTable(args[2]);
        // Auto configure partitioner and reducer
        HFileOutputFormat.configureIncrementalLoad(job, hTable);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
        return (0);
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new DeleteRows(), args);
        System.exit(exitCode);
    }
}

是否有更好/更快的方法,可以使用行键批量删除大量行?显然,在映射器中逐个删除每一行是可能的,但我想这比将删除操作批量推送到正确的区域服务器要慢。

2个回答

2
你的目标是生成包含Delete流(实际上是将其标记为KeyValue)的HFile。标准的做法是使用HFileOutputFormat。你只能将KeyValue更改的流放入此格式中,并且有两个标准的reduce函数:PutSortReducerKeyValueSortReducer。将reduce任务数设置为0,实际上会直接将所有的Delete传递给输出格式,这当然是行不通的。
你最明显的选择:
  • 添加你的reduce函数DeleteSortReducer。这样的reduce函数非常简单,你可以几乎直接复制。你只需要从Delete中提取出单独的KeyValue流并对其进行排序。对于你来说,PutSortReducer是一个很好的例子。因为Put的更改没有被排序,所以需要这样的reduce函数。
  • 构建适当的KeyValue流,其中包含删除标记,而不是Delete流。这可能是速度最快的方法。

Roman,你说的创建一个适当的KeyValue包含删除标记是什么意思?维护自己的字段来确定行是否已被删除,然后定期清理所有需要删除的行? - reducer
不,HBase中的删除实际上是标记而非操作:http://hadoop-hbase.blogspot.com/2011/12/deletion-in-hbase.html - Roman Nikitchenko

0
原来使用TableMapReduceUtil.initTableReducerJob来设置reducer而不是HFileOutputFormat.configureIncrementalLoad,代码就可以正常工作。
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
job.setNumReduceTasks(0);

然而,这仍然不能为completebulkload实用程序创建删除。它只是执行删除RPC。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接