我正在使用Spark 2.2上的Structured Streaming,将HDFS目录中的文件流式传输到Kafka主题。我想捕获我写入主题的数据的Kafka偏移量。
我正在使用
val write = jsonDF
.writeStream.format("kafka")
.option("checkpointLocation", Config().getString(domain + ".kafkaCheckpoint"))
.option("kafka.bootstrap.servers", Config().getString(domain + ".kafkaServer"))
.option("topic", Config().getString(domain + ".kafkaTopic"))
.start()
写入Kafka。
当我使用
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started: " + queryStarted.id)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Query terminated: " + queryTerminated.id)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress: " + queryProgress.progress)
}
})
为了捕获流的进度信息,检索到的信息与在Kafka中创建的偏移量没有关联。
我认为这是因为流提供的信息实际上是有关于我正在利用的文件流,而不是与写入Kafka相关的信息。
在Spark Structured Streaming中,是否有一种方法可以捕获写入Kafka时生成的偏移信息?
添加示例: 当我从源1运行3行数据后,刚创建主题,我得到: 运行1: 开始偏移量:null,结束偏移量:{"logOffset":0} 开始偏移量:{"logOffset":0},结束偏移量:{"logOffset":0}
Kafka Says:
ruwe:2:1
ruwe:1:1
ruwe:0:1
运行2;
Start Offset: {"logOffset":0}, End offset: {"logOffset":1}
Start Offset: {"logOffset":1}, End offset: {"logOffset":1}
Kafka Says:
ruwe:2:2
ruwe:1:2
ruwe:0:2
运行3:
Start Offset: {"logOffset":1}, End offset: {"logOffset":2}
Start Offset: {"logOffset":2}, End offset: {"logOffset":2}
Kafka Says:
ruwe:2:3
ruwe:1:3
ruwe:0:3
我随后用不同的来源运行相同程序的数据,结果如下:
Start Offset: null, End offset: {"logOffset":0}
Start Offset: {"logOffset":0}, End offset: {"logOffset":0}
and of course Kafka continued to increment
这说明Spark报告的信息是基于源的。
我想知道在目标中创建了什么。
SinkProgress.json
。 - Yuval Itzchakov