在Spark Streaming中读取Hbase数据

3
我正在撰写一个项目,从Kafka接收数据并将其写入Hbase表中。因为我想知道记录的差异,所以需要先获取Hbase中具有相同行键的记录,然后对接收到的记录进行减法运算,最后将新记录保存到HBase表中。
一开始,我尝试使用newAPIHadoop从Hbase获取数据。以下是我的尝试:
val conf = HBaseConfiguration.create()
conf.set("zookeeper.znode.parent", "/hbase-secure")
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
conf.set("hbase.zookeeper.quorum", zkQuorum)
conf.set("hbase.master", masterAddr)
conf.set("hbase.zookeeper.property.clientPort", portNum)
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set(TableInputFormat.SCAN_COLUMNS, cfName + ":" + colName)

val HbaseRDD = ssc.sparkContext.newAPIHadoopRDD(conf, 
      classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

通过这种方式,我可以仅一次获取具有特定列族和列名的记录的值。所谓“仅一次”,是指每次启动我的spark流式应用程序时,这段代码将被执行并且我可以获取一个值,但它不会再执行了。由于我想每次从Kafka接收记录时都使用cf和列从HBase读取我的记录,所以这对我不起作用。
为解决这个问题,我将逻辑移动到foreachRDD()中,但不幸的是,sparkContext似乎不可序列化。我收到了类似“任务不可序列化”的错误。
最后,我发现还有另一种方法可以使用hbase.client HTable从hbase读取数据。因此,这是我的最终工作:
def transferToHBasePut(line: String): (ImmutableBytesWritable, Put) = {
    val conf = HBaseConfiguration.create()
    conf.set("zookeeper.znode.parent", "/hbase-secure")
    conf.set("hbase.zookeeper.quorum", "xxxxxx")
    conf.set("hbase.master", "xxxx")
    conf.set("hbase.zookeeper.property.clientPort", "xxx")
    conf.set(TableInputFormat.INPUT_TABLE, "xx")
    conf.set(TableInputFormat.SCAN_COLUMNS, "xxxxx")

    val testTable = new HTable(conf, "testTable")
    val scan = new Scan
    scan.addColumn("cf1".getBytes, "test".getBytes)
    val rs = testTable.getScanner(scan)

    var r = rs.next()
    val res = new StringBuilder
    while(r != null){
      val tmp = new String(r.getValue("cf1".getBytes, "test".getBytes))

      res.append(tmp)
      r= rs.next()
    }
val res = res.toString

//do the following manipulations and return object (ImmutableBytesWritable, Put)
         ..............................
         .......................
          }

在主方法中,我使用上述方法在foreachRDD中使用saveAsNewAPIHadoopDataset方法将其保存到HBase中。

streamData.foreachRDD(stream => stream.map (transferToHBasePut).saveAsNewAPIHadoopDataset(job.getConfiguration))

这对我来说现在运行良好,但我对这个过程有疑问:
我猜想,对于RDD的每个分区,都会创建一个连接到HBase的连接。我想知道是否可能扩展我的应用程序。比如说,如果我在1秒钟内有超过1000条记录,看起来我的Spark Streaming中将建立1000个连接。
这是从HBase读取数据的正确方法吗?在SparkStreaming中读取数据的最佳实践是什么?还是Spark Streaming不应该读取任何数据,而只是设计用于将流式数据写入数据库。
提前致谢。
2个回答

3

经过一些学习,我为每个RDD分区创建了一个配置。在Spark Streaming官方网站上检查foreachRDD的设计模式。实际上,配置并不是连接,所以我不知道如何从现有的连接池中获取连接以获取和插入Hbase记录。


你是否已经使用Spark Streaming从HBase中读取数据?我只能通过为每个数据打开一个连接来读取它。有什么更好的方法吗? - zorkaya

0

foreachRDD 在各个执行器的 JVM 进程上执行。在 transferToHBasePut 方法中,至少可以获取 conf 的单例实例(即在使用现有的 jvm 进程或新的 conf 之前进行空值检查)。因此,这将减少与 Hbase 的连接数,使其等于 Spark 集群中生成的执行器数量。

希望这能帮到您...


谢谢您回答我的问题。我已经尝试了您提供的解决方案,将conf作为参数传递给方法transferToHBasePut。但正如您所说,foreach在每个执行器的JVM进程上执行,单例无法从驱动程序传递到工作节点。我认为这是因为配置不可序列化。最终我发现RDD上有一个名为foreachPartition的方法可以使用。该方法将保证连接每个RDD分区只建立一次。 - Frank Kong
我不确定你是如何让它工作的,Configuration类不支持序列化,因此不能在foreachPartition中使用。 - Ani

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接