使用Spark 2.0.2(结构化流)从Kafka读取Avro消息

9

我有一个Spark 2.0的应用程序,使用spark streaming(通过spark-streaming-kafka-0-10_2.11)从Kafka读取消息。

结构化流处理看起来非常酷,所以我想尝试迁移代码,但我无法弄清楚如何使用它。

在常规流处理中,我使用kafkaUtils创建Dstrean,并在参数中传递值反序列化程序。

在Structured Streaming中,文档说我应该使用DataFrame函数进行反序列化,但我无法确切地理解这意味着什么。

我查看了一些示例,例如此示例,但是我的Avro对象在Kafka中相当复杂,无法像示例中的String一样简单地转换。

到目前为止,我尝试了以下类似的代码(我在另一个问题中看到的):

import spark.implicits._

  val ds1 = spark.readStream.format("kafka").
    option("kafka.bootstrap.servers","localhost:9092").
    option("subscribe","RED-test-tal4").load()

  ds1.printSchema()
  ds1.select("value").printSchema()
  val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()  
  val query = ds2.writeStream
    .outputMode("append")
    .format("console")
    .start()

我遇到了"data type mismatch: cannot cast BinaryType to StructType(StructField(...."的错误,该如何反序列化这个值呢?


有人找到了可行的解决方案吗?以下任何一种对我都不起作用! - Achilleus
1
该库支持以Avro为有效负载的结构化流,并可能有所帮助:ABRiS(Spark的Avro桥)。它仍在开发中,但支持您的用例。披露:我为ABSA工作,也是该库的主要开发人员。 - Felipe Martins Melo
4个回答

4
如上所述,从Spark 2.1.0开始,批处理读取器支持avro,但SparkSession.readStream()不支持。以下是我基于其他答案在Scala中使其正常工作的方法。为简洁起见,我已简化了模式。
package com.sevone.sparkscala.mypackage

import org.apache.spark.sql._
import org.apache.avro.io.DecoderFactory
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}

object MyMain {

    // Create avro schema and reader
    case class KafkaMessage (
        deviceId: Int,
        deviceName: String
    )
    val schemaString = """{
        "fields": [
            { "name":  "deviceId",      "type": "int"},
            { "name":  "deviceName",    "type": "string"},
        ],
        "name": "kafkamsg",
        "type": "record"
    }"""
    val messageSchema = new Schema.Parser().parse(schemaString)
    val reader = new GenericDatumReader[GenericRecord](messageSchema)
    // Factory to deserialize binary avro data
    val avroDecoderFactory = DecoderFactory.get()
    // Register implicit encoder for map operation
    implicit val encoder: Encoder[GenericRecord] = org.apache.spark.sql.Encoders.kryo[GenericRecord]

    def main(args: Array[String]) {

        val KafkaBroker =  args(0);
        val InTopic = args(1);
        val OutTopic = args(2);

        // Get Spark session
        val session = SparkSession
                .builder
                .master("local[*]")
                .appName("myapp")
                .getOrCreate()

        // Load streaming data
        import session.implicits._
        val data = session
                .readStream
                .format("kafka")
                .option("kafka.bootstrap.servers", KafkaBroker)
                .option("subscribe", InTopic)
                .load()
                .select($"value".as[Array[Byte]])
                .map(d => {
                    val rec = reader.read(null, avroDecoderFactory.binaryDecoder(d, null))
                    val deviceId = rec.get("deviceId").asInstanceOf[Int]
                    val deviceName = rec.get("deviceName").asInstanceOf[org.apache.avro.util.Utf8].toString
                    new KafkaMessage(deviceId, deviceName)
                })

1
它对我没用,原因是:java.io.EOFException 错误。 - sri hari kali charan Tummala
2
这个解决方案对启用模式注册表的Kafka无效。它报告了“Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -13”错误。 - Casel Chen
1
请确保此处使用的Avro模式与Kafka中的模式完全匹配。 - Abdul Mannan

3
我还不太熟悉Spark在新的/实验性结构化流处理中如何进行序列化,但是下面的方法确实可行——尽管我不确定这是否是最好的方法(在我看来,这种方法有一种有点别扭的感觉)。
我将尝试通过一个自定义数据类型(这里是Foo case类),而不是特定的Avro,来回答你的问题,但我希望它能帮助你。思路是使用Kryo序列化来序列化/反序列化您的自定义类型,请参见Spark文档中的Tuning: Data serialization
注意:Spark支持通过内置(隐式)编码器对case类进行序列化,您可以通过import spark.implicits._导入。但是为了本例的简单起见,我们将忽略此功能。
假设您已经定义了以下Foo case类作为您的自定义类型(TL;DR提示:为了防止遇到奇怪的Spark序列化投诉/错误,您应该将代码放入单独的Foo.scala文件中):
// This could also be your auto-generated Avro class/type
case class Foo(s: String)

现在您有以下结构化流代码来从Kafka读取数据,其中输入主题包含Kafka消息,其消息值是二进制编码的String,您的目标是基于这些消息值创建Foo实例(即类似于如何将二进制数据反序列化为Avro类的实例):
val messages: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
    .option("subscribe", "my-input-topic")
    .load()

现在我们正在反序列化值为自定义Foo类型的实例,因此我们首先需要定义一个隐式Encoder[Foo]

implicit val myFooEncoder: Encoder[Foo] = org.apache.spark.sql.Encoders.kryo[Foo]
val foos: Dataset[Foo] = messages.map(row => Foo(new String(row.getAs[Array[Byte]]("value")))

回到您的Avro问题,您需要做的是:
  1. 为您的需求创建一个适当的编码器。
  2. Foo(new String(row.getAs [Array[Byte]](“value”))替换为反序列化二进制编码的Avro数据为Avro POJO的代码,即获取消息值中的二进制编码的Avro数据(row.getAs [Array[Byte]](“value”)),并返回一个Avro GenericRecord或其他您在其他地方定义的SpecificCustomAvroObject
如果其他人知道更简洁/更好/...的方法来回答Tal的问题,我很愿意倾听。 :-)
另请参阅:

我认为 Tal 的使用情况是他的主题上没有二进制编码的字符串,而是有二进制编码的 Avro。在这种情况下,使用 bijection-avro 会起作用吗? - zzztimbo
没错,@zzztimbo。我不得不稍微延迟这个项目,所以我还没有机会尝试任何新的东西。希望我能很快研究这个主题。当我开始时,我会研究双射-Avro。 - Tal Joffe
@TalJoffe 请告诉我你的想法。我正在尝试读取由kstream放置的avro,但bijection-avro对我来说行不通。 - zzztimbo
@zzztimbo 哦,好的。就像我说的,可能需要一些时间,但是一旦我弄清楚了,我会在这里通知你的。 - Tal Joffe

3

我公司的某位同事为我解决了这个问题,现在我在这里分享一下,便于以后的读者参考。

基本上,除了miguno建议的部分,我还错过了解码部分:

def decodeMessages(iter: Iterator[KafkaMessage], schemaRegistryUrl: String) : Iterator[<YourObject>] = {
val decoder = AvroTo<YourObject>Decoder.getDecoder(schemaRegistryUrl)
iter.map(message => {
  val record = decoder.fromBytes(message.value).asInstanceOf[GenericData.Record]
  val field1 = record.get("field1Name").asInstanceOf[GenericData.Record]
  val field2 = record.get("field1Name").asInstanceOf[GenericData.String]
        ...
  //create an object with the fields extracted from genericRecord
  })
}

现在你可以像这样读取来自Kafka的消息并对其进行解码:
val ds = spark
  .readStream
  .format(config.getString(ConfigUtil.inputFormat))
  .option("kafka.bootstrap.servers", config.getString(ConfigUtil.kafkaBootstrapServers))
  .option("subscribe", config.getString(ConfigUtil.subscribeTopic))
  .load()
  .as[KafkaMessage]

val decodedDs  = ds.mapPartitions(decodeMessages(_, schemaRegistryUrl))

*KafkaMessage是一个简单的case class,它包含了从Kafka读取时得到的通用对象(key,value,topic,partition,offset,timestamp)。

AvroTo<YourObject>Decoder是一个类,它可以在给定模式注册表URL的情况下解码您的对象。

例如,使用Confluent的KafkaAvroDeserializer和模式注册表。

val kafkaProps = Map("schema.registry.url" -> schemaRegistryUrl)
val client = new CachedSchemaRegistryClient(schemaRegistryUrl, 20)

// If you have Avro encoded keys
val keyDeserializer = new KafkaAvroDeserializer(client)
keyDeserializer.configure(kafkaProps.asJava, true) //isKey = true

// Avro encoded values
valueDeserializer = new KafkaAvroDeserializer(client)
valueDeserializer.configure(kafkaProps.asJava, false) //isKey = false

从这些中,调用.deserialize(topicName, bytes).asInstanceOf[GenericRecord以获取一个avro对象。

希望这能对某些人有所帮助。


那你的意思是除了生成的Avro类之外,我们还需要提供相关的case class吗?你能给我们看一下你的导入语句吗?在这个语句中,"Decoder"类是从哪里获取的呢? val decoder = AvroTo<YourObject>Decoder.getDecoder(schemaRegistryUrl) - Casel Chen

2
请按照以下步骤操作:
  • 定义一个Kafka消息。
  • 定义一个消费者工具,返回一个YourAvroObject的数据集。
  • 定义你的逻辑代码。

Kafka消息:

case class KafkaMessage(key: String, value: Array[Byte],
                                    topic: String, partition: String, offset: Long, timestamp: Timestamp)

Kafka消费者:

import java.util.Collections

import com.typesafe.config.{Config, ConfigFactory}
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.SparkSession

import scala.reflect.runtime.universe._


object KafkaAvroConsumer {

  private val conf: Config = ConfigFactory.load().getConfig("kafka.consumer")
  val valueDeserializer = new KafkaAvroDeserializer()
  valueDeserializer.configure(Collections.singletonMap("schema.registry.url",
    conf.getString("schema.registry.url")), false)

  def transform[T <: GenericRecord : TypeTag](msg: KafkaMessage, schemaStr: String) = {
    val schema = new Schema.Parser().parse(schemaStr)
    Utils.convert[T](schema)(valueDeserializer.deserialize(msg.topic, msg.value))
  }

  def createDataStream[T <: GenericRecord with Product with Serializable : TypeTag]
  (schemaStr: String)
  (subscribeType: String, topics: String, appName: String, startingOffsets: String = "latest") = {

    val spark = SparkSession
      .builder
      .master("local[*]")
      .appName(appName)
      .getOrCreate()

    import spark.implicits._

    // Create DataSet representing the stream of KafkaMessage from kafka
    val ds = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
      .option(subscribeType, topics)
      .option("startingOffsets", "earliest")
      .load()
      .as[KafkaMessage]
      .map(msg => KafkaAvroConsumer.transform[T](msg, schemaStr)) // Transform it Avro object.

    ds
  }

}

更新

工具:

import org.apache.avro.Schema
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.specific.SpecificData

import scala.reflect.runtime.universe._

object Utils {


  def convert[T <: GenericRecord: TypeTag](targetSchema: Schema)(record: AnyRef): T = {
      SpecificData.get.deepCopy(targetSchema, record).asInstanceOf[T]
  }


}

1
你能否提供完整的示例或在Github上分享代码? - sri hari kali charan Tummala
@CaselChen,我已经添加了实用函数,希望它能帮到你。很抱歉,这是旧代码,我没有保留存储库。 - user2550587
@cricket_007,请了解一下模式注册表和模式演化。该模式不是硬编码的。 https://docs.oracle.com/database/nosql-11.2.2.0/GettingStartedGuide/schemaevolution.html - user2550587
我了解模式演化。那不是我的评论。我也知道模式注册表。我的问题是关于你的参数schemaStr...这应该是不必要的,因为KafkaAvroDeserializer类能够检测到Confluent的Avro编码使用的Magic Byte + Schema ID,然后针对该模式字符串在注册表中进行查找。换句话说,我认为应该可以不“硬编码”读取器模式字符串,并让它从注册表中动态提取。 - OneCricketeer
我猜我的问题是为什么你需要它,或者在这个例子中你会在哪里定义它?如果您使用客户端获取最新的向后兼容模式,我可以理解,但是那样您已经有了一个模式对象。没有必要解析字符串。 - OneCricketeer
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接