Spark结构化流无法从Kafka偏移量重新启动

4
我们有一个长时间运行的Spark Structured Streaming查询,它从Kafka读取数据,并且我们希望在重启后这个查询能够从上次离开的地方继续读取。然而,我们已将startingOffsets设置为"earliest",重启后我们看到查询又从Kafka主题的开头开始读取。
我们的基本查询如下:
  val extract = sparkSession
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "server:port")
    .option("subscribe", "topic")
    .option("startingOffsets", "earliest")
    .load()

  val query: StreamingQuery = extract 
    .writeStream
    .option("checkpointLocation", s"/tmp/checkpoint/kafka/")
    .foreach(writer)
    .start()

我们看到检查点目录被正确创建,并且偏移文件中的偏移量符合我们的预期。
当我们重新启动时,会看到如下消息:
25-07-2017 14:35:32 INFO  ConsumerCoordinator:231 - Setting newly assigned partitions [KafkaTopic-2, KafkaTopic-1, KafkaTopic-0, KafkaTopic-3] for group spark-kafka-source-dedc01fb-c0a7-40ea-8358-a5081b961968--1396947302-driver

我们告诉查询从“earliest”开始,但文档说:

仅适用于启动新的流式查询,并且恢复始终会从查询离开的地方继续。

这难道不意味着重启应用程序会导致查询从离开的地方继续吗?
在Spark结构化流中,不允许为Kafka设置“group.id”。请参见:请注意以下Kafka参数无法设置,Kafka源将抛出异常。 我尝试添加queryName,以便在运行期间识别查询,但它没有任何效果。
我们正在使用YARN上的Spark 2.1。
您有关于为什么不起作用或我们做错了什么的任何想法吗?

更新日志:

来自驱动程序

来自工作人员


你的 SparkSession 上是否设置了 "spark.sql.streaming.checkpointLocation"?你可以使用 queryNamecheckpointLocation 选项在运行之间对查询进行检查点。此外,正如 @zsxwing 指出的那样,由于 Spark 使用 WAL 进行检查点,因此在重新启动后它将重新处理最后一个检查点批次。 - nonsleepr
你可以尝试将 startingOffsets 设置为 latest 吗? - himanshuIIITian
@himanshuIIITian - 使用最新的版本意味着如果未使用检查点目录,则在重新启动期间我将丢失消息。 - Patrick McGloin
@nonsleepr 我想你指的是Spark Streaming,而我们这里使用的是Spark Structured Streaming。 - Patrick McGloin
@PatrickMcGloin,我在谈论Structured Streaming。虽然没有记录,但如果查询中未设置checkpointLocation,则会使用"spark.sql.streaming.checkpointLocation" + queryName作为检查点目录。我的WAL评论仍然相关。 - nonsleepr
显示剩余12条评论
1个回答

0
首先,为什么你说检查点目录会再次创建。在初始运行后,您是否将其删除然后恢复它?
因此,只要清楚了".option("startingOffsets", "earliest")"设置将在您首次启动查询时从最开始读取。 考虑到某些情况下出现故障,流被停止。您修复它并重新启动流(而无需删除检查点目录),则应从先前停止的偏移量处开始流。
如果您已删除检查点目录然后恢复流,显然它将没有任何读取偏移量的历史记录(因为您已删除检查点),因此将从Kafka上可用的最早偏移量开始。

如果检查点中标记的偏移量在主题中不再可用,您需要修复它并重新启动流(不删除检查点目录)。流应该从先前停止的偏移量处开始。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接