如何以编程方式最快地批量加载数据到HBase?

19

我有一个纯文本文件,可能包含数百万行,需要自定义解析,然后尽可能快地将其加载到 HBase 表中(使用 Hadoop 或 HBase Java 客户端)。

我的当前解决方案基于一个 MapReduce 作业,没有 Reduce 部分。我使用 FileInputFormat 读取文本文件,以便每行传递给我的 Mapper 类的 map 方法。此时,将解析该行以形成 Put 对象,并将其写入context。然后,TableOutputFormat 接收 Put 对象并将其插入表格。

这个解决方案平均插入速率为每秒 1,000 行,这比我预期的要慢。 我的 HBase 设置在单台服务器上的伪分布式模式下运行。

有趣的是,在插入 1,000,000 行时,会生成 25 个 Mapper(任务),但它们是串行运行的(一个接一个)。 这种情况正常吗?

以下是我当前解决方案的代码:

public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

    protected void map(LongWritable key, Text value, Context context) throws IOException {
        Map<String, String> parsedLine = parseLine(value.toString());

        Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1])));
        for (String currentKey : parsedLine.keySet()) {
            row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey)));
        }

        try {
            context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

public int run(String[] args) throws Exception {
    if (args.length != 2) {
        return -1;
    }

    conf.set("hbase.mapred.outputtable", args[1]);

    // I got these conf parameters from a presentation about Bulk Load
    conf.set("hbase.hstore.blockingStoreFiles", "25");
    conf.set("hbase.hregion.memstore.block.multiplier", "8");
    conf.set("hbase.regionserver.handler.count", "30");
    conf.set("hbase.regions.percheckin", "30");
    conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3");
    conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15");

    Job job = new Job(conf);
    job.setJarByClass(BulkLoadMapReduce.class);
    job.setJobName(NAME);
    TextInputFormat.setInputPaths(job, new Path(args[0]));
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapperClass(CustomMap.class);
    job.setOutputKeyClass(ImmutableBytesWritable.class);
    job.setOutputValueClass(Put.class);
    job.setNumReduceTasks(0);
    job.setOutputFormatClass(TableOutputFormat.class);

    job.waitForCompletion(true);
    return 0;
}

public static void main(String[] args) throws Exception {
    Long startTime = Calendar.getInstance().getTimeInMillis();
    System.out.println("Start time : " + startTime);

    int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args);

    Long endTime = Calendar.getInstance().getTimeInMillis();
    System.out.println("End time : " + endTime);
    System.out.println("Duration milliseconds: " + (endTime-startTime));

    System.exit(errCode);
}

我假设你想让你的标题为“批量加载”,而不是“bluk加载”...但如果我的更正有误,请告诉我。 :-) - Michael Dautermann
你读过这个吗?http://hbase.apache.org/docs/r0.89.20100621/bulk-loads.html - Chris Shain
感谢迈克尔纠正我的拼写错误,我实际上已经仔细检查了我的问题两次...唉,在凌晨2点进行stackoverflow真是太困难了。 - Cihan Keser
@Chris:是的,我之前读过那个链接。关于预先划分我的区域:我真的没有理解那个概念 :( 你能解释一下或者告诉我如何做吗? - Cihan Keser
2
每一行都属于一个区域,由其行键确定。您可以在Web UI中查看每个区域的键范围。如果您不预先拆分区域,则插入的数据将全部进入第一个区域,直到达到拆分该区域的阈值为止,此时它将在区域键中发生的任何键上均匀拆分。如果您不预先拆分区域,则所有的puts都会进入一个区域(在一个节点上),直到发生拆分,然后只进入两个区域等等。通过预先拆分,您可以从一开始就并行写入。 - Chris Shain
显示剩余2条评论
2个回答

17

我经历了一个与你类似的过程,试图找到一种有效的方式从MR加载数据到HBase。我发现使用 HFileOutputFormat 作为 MR 的 OutputFormatClass 可以起作用。

以下是我的代码基础,我用它来生成 job 和 Mapper 的 map 函数,写出数据非常快速。我们不再使用它,所以我手头没有具体数字,但大约在不到一分钟内处理了250万记录。

这是我编写的函数(精简版),用于生成MapReduce进程的任务,将数据放入HBase中:

private Job createCubeJob(...) {
    //Build and Configure Job
    Job job = new Job(conf);
    job.setJobName(jobName);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(Put.class);
    job.setMapperClass(HiveToHBaseMapper.class);//Custom Mapper
    job.setJarByClass(CubeBuilderDriver.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(HFileOutputFormat.class);

    TextInputFormat.setInputPaths(job, hiveOutputDir);
    HFileOutputFormat.setOutputPath(job, cubeOutputPath);

    Configuration hConf = HBaseConfiguration.create(conf);
    hConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
    hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort);

    HTable hTable = new HTable(hConf, tableName);

    HFileOutputFormat.configureIncrementalLoad(job, hTable);
    return job;
}

这是我从 HiveToHBaseMapper 类中获取的 map 函数(稍作编辑)。

public void map(WritableComparable key, Writable val, Context context)
        throws IOException, InterruptedException {
    try{
        Configuration config = context.getConfiguration();
        String[] strs = val.toString().split(Constants.HIVE_RECORD_COLUMN_SEPARATOR);
        String family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY);
        String column = strs[COLUMN_INDEX];
        String Value = strs[VALUE_INDEX];
        String sKey = generateKey(strs, config);
        byte[] bKey = Bytes.toBytes(sKey);
        Put put = new Put(bKey);
        put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) 
                        ? Bytes.toBytes(Double.MIN_VALUE)
                        : Bytes.toBytes(value));

        ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
        context.write(ibKey, put);

        context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1);
    }
    catch(Exception e){
        context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1);    
    }

}

我非常确定这不会是一个适合你复制粘贴的解决方案。显然,我在这里处理的数据不需要任何定制处理(这是在此之前做的MR作业)。我想提供的主要内容是HFileOutputFormat。其余部分只是我使用它的示例。 :)
我希望这能让你找到一个稳固的好解决方案。 :


1
我在我的代码中尝试使用HfileOutputFormat,但是一直收到以下异常,有什么想法吗?java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.KeyValue at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:82) at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:508) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:156) ... - Cihan Keser
@kramer,出现类型转换错误通常是因为尝试“写入”一个与期望类型不同的变量。但这并不一定是唯一的原因。如果想要更准确的判断,需要查看代码。 - QuinnG
HFileOutputFormat比TableOutputFormat更快吗?在区域分割相等的情况下。 - Alexey Tigarev
1
哪个 jar 包含了 HfileOutputFormat 类?我找不到它。 - Jon Cardoso-Silva

0
有一个有趣的事情,那就是在插入100万行数据时,会派生25个映射器(任务),但它们串行运行(一个接一个);这正常吗? mapreduce.tasktracker.map.tasks.maximum参数默认值为2,决定了一个节点上可以并行运行的最大任务数。除非更改,否则每个节点应该同时运行2个地图任务。

尝试过了,但结果没有改变。 - Cihan Keser
你在哪里指定了参数?应该在所有节点的mapred-site.xml中在Hadoop守护程序启动之前指定。请查看此文档。你如何验证?可以从JobTracker Web控制台进行验证。 - Praveen Sripati

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接