使用Spark结构化流写入时捕获Kafka偏移量

3

我正在使用Spark 2.2上的Structured Streaming,将HDFS目录中的文件流式传输到Kafka主题。我想捕获我写入主题的数据的Kafka偏移量。

我正在使用

val write = jsonDF
.writeStream.format("kafka")
.option("checkpointLocation", Config().getString(domain + ".kafkaCheckpoint"))
.option("kafka.bootstrap.servers", Config().getString(domain + ".kafkaServer"))
.option("topic", Config().getString(domain + ".kafkaTopic"))
.start()

写入Kafka。

当我使用

spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
    println("Query started: " + queryStarted.id) 
  }
  override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
    println("Query terminated: " + queryTerminated.id)
  }
  override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
    println("Query made progress: " + queryProgress.progress)
  }
})

为了捕获流的进度信息,检索到的信息与在Kafka中创建的偏移量没有关联。
我认为这是因为流提供的信息实际上是有关于我正在利用的文件流,而不是与写入Kafka相关的信息。
在Spark Structured Streaming中,是否有一种方法可以捕获写入Kafka时生成的偏移信息?
添加示例: 当我从源1运行3行数据后,刚创建主题,我得到: 运行1: 开始偏移量:null,结束偏移量:{"logOffset":0} 开始偏移量:{"logOffset":0},结束偏移量:{"logOffset":0}
 Kafka Says:
 ruwe:2:1
 ruwe:1:1
 ruwe:0:1

运行2;

  Start Offset: {"logOffset":0}, End offset: {"logOffset":1}
  Start Offset: {"logOffset":1}, End offset: {"logOffset":1}

 Kafka Says:
 ruwe:2:2
 ruwe:1:2
 ruwe:0:2

运行3:

  Start Offset: {"logOffset":1}, End offset: {"logOffset":2}
  Start Offset: {"logOffset":2}, End offset: {"logOffset":2}

 Kafka Says:
 ruwe:2:3
 ruwe:1:3
 ruwe:0:3

我随后用不同的来源运行相同程序的数据,结果如下:

  Start Offset: null, End offset: {"logOffset":0}
  Start Offset: {"logOffset":0}, End offset: {"logOffset":0}

  and of course Kafka continued to increment

这说明Spark报告的信息是基于源的。

我想知道在目标中创建了什么。

2个回答

1

使用Spark Structured Streaming,我们是否可以捕获写入Kafka时生成的偏移信息?

是的,在onQueryProgress中,您需要查看StreamingQueryProgress.sources,它是一个Array[SourceProgress]。它有两个字符串:startOffsetendOffset,这些都是JSON,您可以解析它们:

sparkSession.streams.addListener(new StreamingQueryListener {override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = ???

  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    val source = event.progress.sources.headOption
    source.map(src => println(s"Start Offset: ${src.startOffset}, End offset: ${src.endOffset}"))
  }

  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = ()
})

JSON的结构如下所示:

"startOffset" : {
  "topic-name" : {
    "0" : 1,
    "1" : 22,
    "2" : 419,
  }
},
"endOffset" : {
  "topic-name" : {
    "0" : 10,
    "1" : 100,
    "2" : 1000
  }
}

2
这似乎是针对源而不是目标(Kafka)报告偏移量。如果我继续从同一源运行,我的数字会增加,但当我从第二个源运行数据时,它会报告初始数字,表明这些不是报告的Kafka偏移量。 - SRuwe
如果您想要目标偏移量,请查看 SinkProgress.json - Yuval Itzchakov

1
阅读Spark结构流代码,特别是Kafka KafkaWriter、KafkaWriteTask和CachedKafkaProducer。 我们发现,Spark不会在回调函数中使用从KafkaProducer返回的offsets。他们定义的回调仅捕获异常。基于此,我认为在当前版本2.2中无法实现。他们提供的信息都围绕查询源而不是目标。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接