Kafka Protobuf的Spark结构化流处理

3
我正在尝试创建一个消费ProtoBuf编码的Kafka消息的Spark Streaming。
以下是我过去几天尝试的内容。
    import spark.implicits._
    def parseLine (str: Array[Byte]): ProtoSchema = ProtoSchema.parseFrom(str)   
    val storageLoc: String = "/tmp/avl/output"
    val checkpointLoc: String = "/tmp/avl/checkpoint"
    val dfStreamReader: DataFrame = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("failOnDataLoss", value = false)
      .option("subscribe", topics)
      .load()

    val dfStreamReaderValues: Dataset[Array[Byte]] = dfStreamReader.map(row => row.getAs[Array[Byte]]("value"))

    val rddProtoSchema: Dataset[ProtoSchema] = dfStreamReaderValues.map(str => parseLine(str))

    val dfRaw: DataFrame = spark.sqlContext.protoToDataFrame(rddProtoSchema.rdd)

    val streamWriterAirline: StreamingQuery = dfRaw.writeStream
      .format("parquet")
      .option("path", storageLoc)
      .option("checkpointLocation", checkpointLoc)
      .outputMode(Append)
      .trigger(ProcessingTime("2 seconds"))
      .start()
    spark.streams.awaitAnyTermination(20000)

我使用scalapb成功将一个二进制proto文件解码并转换成dataframe。但在数据流处理时,编译时出现了解析行时的异常:

    val rddProtoSchema: Dataset[ProtoSchema] = dfStreamReaderValues.map(str => parseLine(str))
    >>>>>
    scala.ScalaReflectionException: <none> is not a term

有人能给一些提示吗?

1个回答

1

更新:sparksql-scalapb现在能够为协议缓冲区推导编码器,以前使用UDT生成器的方法不再需要。指令在此处可用


旧答案(现在不相关):使用数据集时,Spark 会尝试为消息中的每个字段找到一个 SQL 类型。Spark 不知道如何处理 ScalaPB 枚举(它们表示为由 case 对象扩展的密封特征),因此会出现此错误。解决方法是注册枚举和用户定义类型。可以按以下方式完成:

  1. 将 sparksql-scalapb-gen 添加到您的 project/plugins.sbt(而不是主要的 build.sbt
libraryDependencies += "com.thesamet.scalapb" %% "sparksql-scalapb-gen" % "0.8.1"

检查上面的版本是否与您使用的sparksql-scalapb版本匹配。

  1. 将此生成器添加到build.sbt中的PB.targets
PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value,
  scalapb.UdtGenerator -> (sourceManaged in Compile).value
)
  1. 重新生成源代码(可能需要执行 sbt clean,然后执行 sbt compile)。

  2. 在您的主函数中调用生成的注册函数。它应该是 mypackage.MyFileUdt.register()

参见:https://scalapb.github.io/sparksql.html#datasets-and-none-is-not-a-term

示例项目:https://github.com/thesamet/sparksql-scalapb-test


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接