我正在尝试创建一个消费ProtoBuf编码的Kafka消息的Spark Streaming。
以下是我过去几天尝试的内容。
以下是我过去几天尝试的内容。
import spark.implicits._
def parseLine (str: Array[Byte]): ProtoSchema = ProtoSchema.parseFrom(str)
val storageLoc: String = "/tmp/avl/output"
val checkpointLoc: String = "/tmp/avl/checkpoint"
val dfStreamReader: DataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("failOnDataLoss", value = false)
.option("subscribe", topics)
.load()
val dfStreamReaderValues: Dataset[Array[Byte]] = dfStreamReader.map(row => row.getAs[Array[Byte]]("value"))
val rddProtoSchema: Dataset[ProtoSchema] = dfStreamReaderValues.map(str => parseLine(str))
val dfRaw: DataFrame = spark.sqlContext.protoToDataFrame(rddProtoSchema.rdd)
val streamWriterAirline: StreamingQuery = dfRaw.writeStream
.format("parquet")
.option("path", storageLoc)
.option("checkpointLocation", checkpointLoc)
.outputMode(Append)
.trigger(ProcessingTime("2 seconds"))
.start()
spark.streams.awaitAnyTermination(20000)
我使用scalapb成功将一个二进制proto文件解码并转换成dataframe。但在数据流处理时,编译时出现了解析行时的异常:
val rddProtoSchema: Dataset[ProtoSchema] = dfStreamReaderValues.map(str => parseLine(str))
>>>>>
scala.ScalaReflectionException: <none> is not a term
有人能给一些提示吗?