我们有一个长时间运行的Spark Structured Streaming查询,它从Kafka读取数据,并且我们希望在重启后这个查询能够从上次离开的地方继续读取。然而,我们已将
我们的基本查询如下:
我们看到检查点目录被正确创建,并且偏移文件中的偏移量符合我们的预期。
当我们重新启动时,会看到如下消息:
我们告诉查询从“
在Spark结构化流中,不允许为Kafka设置“
我们正在使用YARN上的Spark 2.1。
您有关于为什么不起作用或我们做错了什么的任何想法吗?
startingOffsets
设置为"earliest
",重启后我们看到查询又从Kafka主题的开头开始读取。我们的基本查询如下:
val extract = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server:port")
.option("subscribe", "topic")
.option("startingOffsets", "earliest")
.load()
val query: StreamingQuery = extract
.writeStream
.option("checkpointLocation", s"/tmp/checkpoint/kafka/")
.foreach(writer)
.start()
我们看到检查点目录被正确创建,并且偏移文件中的偏移量符合我们的预期。
当我们重新启动时,会看到如下消息:
25-07-2017 14:35:32 INFO ConsumerCoordinator:231 - Setting newly assigned partitions [KafkaTopic-2, KafkaTopic-1, KafkaTopic-0, KafkaTopic-3] for group spark-kafka-source-dedc01fb-c0a7-40ea-8358-a5081b961968--1396947302-driver
我们告诉查询从“
earliest
”开始,但文档说:
仅适用于启动新的流式查询,并且恢复始终会从查询离开的地方继续。
这难道不意味着重启应用程序会导致查询从离开的地方继续吗?在Spark结构化流中,不允许为Kafka设置“
group.id
”。请参见:请注意以下Kafka参数无法设置,Kafka源将抛出异常。
我尝试添加queryName
,以便在运行期间识别查询,但它没有任何效果。我们正在使用YARN上的Spark 2.1。
您有关于为什么不起作用或我们做错了什么的任何想法吗?
更新日志:
SparkSession
上是否设置了"spark.sql.streaming.checkpointLocation"
?你可以使用queryName
或checkpointLocation
选项在运行之间对查询进行检查点。此外,正如 @zsxwing 指出的那样,由于 Spark 使用 WAL 进行检查点,因此在重新启动后它将重新处理最后一个检查点批次。 - nonsleeprstartingOffsets
设置为latest
吗? - himanshuIIITiancheckpointLocation
,则会使用"spark.sql.streaming.checkpointLocation"
+queryName
作为检查点目录。我的WAL评论仍然相关。 - nonsleepr