我有一个Spark 2.0的应用程序,使用spark streaming(通过spark-streaming-kafka-0-10_2.11)从Kafka读取消息。
结构化流处理看起来非常酷,所以我想尝试迁移代码,但我无法弄清楚如何使用它。
在常规流处理中,我使用kafkaUtils创建Dstrean,并在参数中传递值反序列化程序。
在Structured Streaming中,文档说我应该使用DataFrame函数进行反序列化,但我无法确切地理解这意味着什么。
我查看了一些示例,例如此示例,但是我的Avro对象在Kafka中相当复杂,无法像示例中的String一样简单地转换。
到目前为止,我尝试了以下类似的代码(我在另一个问题中看到的):
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","RED-test-tal4").load()
ds1.printSchema()
ds1.select("value").printSchema()
val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
我遇到了"data type mismatch: cannot cast BinaryType to StructType(StructField(...."的错误,该如何反序列化这个值呢?