Hbase批量加载数据时追加而非覆盖数据

3

我正在使用Java中的MapReduce和Bulkload将数据加载到Hbase中。基本上,我创建了一个Mapper,并使用HFileOutputFormat2.configureIncrementalLoad(完整代码在问题结尾),用于reduce。我使用一个仅从文件中读取一些字节并创建put的mapper。然后使用LoadIncrementalHFiles.doBulkLoad将数据写入Hbase。这一切都很顺利。但是当执行此操作时,它会覆盖Hbase中旧的值。因此,我正在寻找一种可以追加数据的方法,就像API中的append函数一样。感谢您阅读,希望你们中的一些人能够帮助我 :)

public int run(String[] args) throws Exception {
    int result=0;
    String outputPath = args[1];
    Configuration configuration = getConf();
    configuration.set("data.seperator", DATA_SEPERATOR);
    configuration.set("hbase.table.name",TABLE_NAME);
    configuration.set("COLUMN_FAMILY_1",COLUMN_FAMILY_1);
    configuration.set("COLUMN_FAMILY_2",COLUMN_FAMILY_2);

    Job job = Job.getInstance(configuration);
    job.setJarByClass(HBaseBulkLoadDriver.class);
    job.setJobName("Bulk Loading HBase Table::"+TABLE_NAME);
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapperClass(HBaseBulkLoadMapper.class);

    FileInputFormat.addInputPaths(job, args[0]);
    FileSystem.getLocal(getConf()).delete(new Path(outputPath), true);
    HFileOutputFormat2.setOutputPath(job,new Path((outputPath)));
    job.setMapOutputValueClass(Put.class);
    Connection c = ConnectionFactory.createConnection(configuration);
    Table t = c.getTable(TableName.valueOf(TABLE_NAME));
    RegionLocator rl = c.getRegionLocator(TableName.valueOf(TABLE_NAME));
    HFileOutputFormat2.configureIncrementalLoad(job,t,rl);
    System.out.println("start");
    job.waitForCompletion(true);
    if (job.isSuccessful()) {
        HBaseBulkLoad.doBulkLoad(outputPath, TABLE_NAME);
    } else {

        result = -1;
    }
    return result;
}



public static void doBulkLoad(String pathToHFile, String tableName) {
    try {
        Configuration configuration = new Configuration();
        configuration.set("mapreduce.child.java.opts", "-Xmx1g");
        HBaseConfiguration.addHbaseResources(configuration);
        LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);


        //HTable hTable = new HTable(configuration, tableName);
        //loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);

        Connection connection = ConnectionFactory.createConnection(configuration);
        Table table = connection.getTable(TableName.valueOf(tableName));
        Admin admin = connection.getAdmin();
        RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName));
        //path, admin, table, region locator
        loadFfiles.doBulkLoad(new Path(pathToHFile),admin,table,regionLocator);


        System.out.println("Bulk Load Completed..");
    } catch(Exception exception) {
        exception.printStackTrace();
    }

根据评论中的要求,我在此处添加了表描述的输出,因为该表是通过Python Happybase API创建的,我不确定API默认设置了哪些选项标志...
``` {NAME => '0', BLOOMFILTER => 'NONE', VERSIONS => '3', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'false', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'} {NAME => '1', BLOOMFILTER => 'NONE', VERSIONS => '3', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'false', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'} ```

1
HBase批量加载默认情况下会追加数据,除非您为存储行的一个版本配置了表和列族。您能否在帖子中添加一下您是如何创建表的? - maxteneff
嘿,我使用Happybase API创建了表格,并添加了表格描述... 当我尝试在同一行键、列族和列描述符的组合中放入两个不同的值,然后从该行检索列时,我只得到了最后一个值。但是我希望得到类似于“Value1Value2”的东西,如果第一个put放入字符串Value1,第二个put放入Value2。 - Bierbarbar
1
你是如何检查在两个键之后只有一行版本的?如果在两个单独的批量加载期间尝试插入两个不同的键,会发生什么? - maxteneff
嗨,我回来看了一下,发现当我执行2个批量加载时,密钥的版本不同。但是,如果我将value0和value1放在同一个文件中,然后尝试对该文件进行批量加载,那么我只会得到一个版本,其中文件中的第一个值被覆盖...正如我在问题中所说,我希望hbase简单地连接值的字节的行为... - Bierbarbar
1个回答

1
在HFileOutputFormat2.configureIncrementalLoad()方法中http://atetric.com/atetric/javadoc/org.apache.hbase/hbase-server/1.2.4/src-html/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.html#line.408,使用PutSortReducer作为reducer。
在PutSortReducer.reduce()方法中http://atetric.com/atetric/javadoc/org.apache.hbase/hbase-server/1.2.4/src-html/org/apache/hadoop/hbase/mapreduce/PutSortReducer.html,KeyValues被存储在一个TreeSet中,比较器仅比较键。这就是为什么只有一个值会被保留。
如果想要保留两个值,可以创建自己的reducer,基于PutSortReducer,在其中保留两个值。并将其设置为:

HFileOutputFormat2.configureIncrementalLoad(job,t,rl); job.setReducerClass(MyReducer.class);


是的,创建一个自定义的reducer可能会起作用,这样你就不会在这个批量加载中覆盖具有相同键的文件,但它并不能解决另一个问题,即我希望将文件中的数据添加到现有版本而不是创建一个新版本。 - Bierbarbar
1
覆盖现有版本是HBase的一种通用行为,而不是特定于批量加载的。要解决这个问题,在自定义的Reducer中,您可以从HBase读取数据并附加新值。 - Guobiao Mo
是的,我知道那是一种通用行为,但我想找到一种像追加函数一样的方法,它可以在区域服务器上读取数据并将输入字节与旧字节连接起来... - Bierbarbar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接