Spark结构化流与Hbase集成

5
我们正在对从MySQL收集的kafka数据进行流处理。现在,一旦完成所有分析工作,我希望将数据直接保存到Hbase。我已经阅读了Spark结构化流文档,但没有找到任何与Hbase配合使用的sink。以下是我用于从Kafka读取数据的代码。
 val records = spark.readStream.format("kafka").option("subscribe", "kaapociot").option("kafka.bootstrap.servers", "XX.XX.XX.XX:6667").option("startingOffsets", "earliest").load
 val jsonschema = StructType(Seq(StructField("header", StringType, true),StructField("event", StringType, true)))
 val uschema = StructType(Seq(
             StructField("MeterNumber", StringType, true),
             StructField("Utility", StringType, true),
             StructField("VendorServiceNumber", StringType, true),
             StructField("VendorName", StringType, true),
             StructField("SiteNumber",  StringType, true),
             StructField("SiteName", StringType, true),
             StructField("Location", StringType, true),
             StructField("timestamp", LongType, true),
             StructField("power", DoubleType, true)
             ))
 val DF_Hbase = records.selectExpr("cast (value as string) as Json").select(from_json($"json",schema=jsonschema).as("data")).select("data.event").select(from_json($"event", uschema).as("mykafkadata")).select("mykafkadata.*")

现在最终,我想要将DF_Hbase数据框保存在hbase中。

请查看 https://dev59.com/eFgR5IYBdhLWcg3wvPmM - Vignesh I
3个回答

8

1- 将以下库添加到您的项目中:

      "org.apache.hbase" % "hbase-client" % "2.0.1"
      "org.apache.hbase" % "hbase-common" % "2.0.1"

2- 在您的代码中添加此特征:

   import java.util.concurrent.ExecutorService
   import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
   import org.apache.hadoop.hbase.security.User
   import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
   import org.apache.spark.sql.ForeachWriter

   trait HBaseForeachWriter[RECORD] extends ForeachWriter[RECORD] {

     val tableName: String
     val hbaseConfResources: Seq[String]

     def pool: Option[ExecutorService] = None

     def user: Option[User] = None

     private var hTable: Table = _
     private var connection: Connection = _


     override def open(partitionId: Long, version: Long): Boolean = {
       connection = createConnection()
       hTable = getHTable(connection)
       true
     }

     def createConnection(): Connection = {
       val hbaseConfig = HBaseConfiguration.create()
       hbaseConfResources.foreach(hbaseConfig.addResource)
       ConnectionFactory.createConnection(hbaseConfig, pool.orNull,                      user.orNull)

     }

     def getHTable(connection: Connection): Table = {
       connection.getTable(TableName.valueOf(tableName))
     }

     override def process(record: RECORD): Unit = {
       val put = toPut(record)
       hTable.put(put)
     }

     override def close(errorOrNull: Throwable): Unit = {
       hTable.close()
       connection.close()
     }

     def toPut(record: RECORD): Put

   }

3- 将它用于您的逻辑:

    val ds = .... //anyDataset[WhatEverYourDataType]

    val query = ds.writeStream
           .foreach(new HBaseForeachWriter[WhatEverYourDataType] {
                            override val tableName: String = "hbase-table-name"
                            //your cluster files, i assume here it is in resources  
                            override val hbaseConfResources: Seq[String] = Seq("core-site.xml", "hbase-site.xml") 

                            override def toPut(record: WhatEverYourDataType): Put = {
                              val key = .....
                              val columnFamaliyName : String = ....
                              val columnName : String = ....
                              val columnValue = ....

                              val p = new Put(Bytes.toBytes(key))
                              //Add columns ... 
                   p.addColumn(Bytes.toBytes(columnFamaliyName),
                               Bytes.toBytes(columnName), 
                               Bytes.toBytes(columnValue))

                              p
                            }

                          }
           ).start()

         query.awaitTermination()

1
这是正确的方法,你还可以添加更复杂的逻辑使得该接收器具有幂等性,以实现精确一次交付语义。 - linehrr

1
这种方法对我来说很有效,即使使用pyspark:https://github.com/hortonworks-spark/shc/issues/205
package HBase
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.execution.datasources.hbase._

class HBaseSink(options: Map[String, String]) extends Sink with Logging {
  // String with HBaseTableCatalog.tableCatalog
  private val hBaseCatalog = options.get("hbasecat").map(_.toString).getOrElse("")

  override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {   
    val df = data.sparkSession.createDataFrame(data.rdd, data.schema)
    df.write
      .options(Map(HBaseTableCatalog.tableCatalog->hBaseCatalog,
        HBaseTableCatalog.newTable -> "5"))
      .format("org.apache.spark.sql.execution.datasources.hbase").save()
  }
}

class HBaseSinkProvider extends StreamSinkProvider with DataSourceRegister {
  def createSink(
                  sqlContext: SQLContext,
                  parameters: Map[String, String],
                  partitionColumns: Seq[String],
                  outputMode: OutputMode): Sink = {
    new HBaseSink(parameters)
  }

  def shortName(): String = "hbase"
}

我将名为HBaseSinkProvider.scala的文件添加到shc/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase中,并进行了构建,示例运行完美。

这是使用的示例(Scala):

inputDF.
   writeStream.
   queryName("hbase writer").
   format("HBase.HBaseSinkProvider").
   option("checkpointLocation", checkPointProdPath).
   option("hbasecat", catalog).
   outputMode(OutputMode.Update()).
   trigger(Trigger.ProcessingTime(30.seconds)).
   start

并且这是我在Python中使用它的一个示例:

inputDF \
    .writeStream \
    .outputMode("append") \
    .format('HBase.HBaseSinkProvider') \
    .option('hbasecat', catalog_kafka) \
    .option("checkpointLocation", '/tmp/checkpoint') \
    .start()

请考虑描述解决方案(而不仅是链接到其他网站)。“自包含”答案更受欢迎,因为它们不依赖于可能在未来更改/消失的外部资源,而且它们让用户立即访问他们想要的信息,而不是打开另一个浏览器选项卡。 - HAVB
请在pyspark中提供完整的解决方案,因为许多人在链接中说它不起作用。 - saurabh shashank

0
你正在处理来自Kafka的数据吗?还是只是将它们泵送到HBase中?一个值得考虑的选项是使用Kafka Connect。这为您提供了一种基于配置文件的方法,用于将Kafka与其他系统(包括HBase)集成。

我刚刚检查了一下,但是我找不到一个HBase连接器。 - Chris Snow
这里至少有两个(我没有尝试过):https://www.google.com/search?q=Kafka+Connect+hbase&ie=utf-8&oe=utf-8&client=firefox-b-ab - Robin Moffatt

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接