Kafka和Flink重新启动时会出现重复消息问题。

7
首先,这与当我重新运行Flink消费者时,Kafka再次消耗最新消息非常相似,但并不完全相同。那个问题的答案似乎不能解决我的问题。如果我在那个答案中错过了什么,请重新表述答案,因为我显然错过了某些内容。
虽然问题是完全相同的-- Flink(kafka连接器)重新运行它关闭之前看到的最后3-9条消息。

我的版本

Flink 1.1.2
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91

我的代码

import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime.state.filesystem._

object Runner {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(500)
    env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "testing");

    val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new SimpleStringSchema(), properties)
    val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", "testing-out", new SimpleStringSchema())
    env.addSource(kafkaConsumer)
      .addSink(kafkaProducer)

    env.execute()
  }
}

我的SBT依赖项

libraryDependencies ++= Seq(
    "org.apache.flink" %% "flink-scala" % "1.1.2",
    "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
    "org.apache.flink" %% "flink-clients" % "1.1.2",
    "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
    "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
)

我的流程

(3个终端)

TERM-1 start sbt, run program
TERM-2 create kafka topics testing-in and testing-out
TERM-2 run kafka-console-producer on testing-in topic
TERM-3 run kafka-console-consumer on testing-out topic
TERM-2 send data to kafka producer.
Wait for a couple seconds (buffers need to flush)
TERM-3 watch data appear in testing-out topic
Wait for at least 500 milliseconds for checkpointing to happen
TERM-1 stop sbt
TERM-1 run sbt
TERM-3 watch last few lines of data appear in testing-out topic

我的期望

当系统没有错误时,我希望能够在不重新处理已成功完成流的消息的情况下打开和关闭 flink。

我的尝试修复

我添加了对 setStateBackend 的调用,认为默认的内存后端可能没有正确记住。但这似乎没有帮助。

我删除了对 enableCheckpointing 的调用,希望也许在 Flink 和 Zookeeper 中有一个单独的机制来跟踪状态。但这似乎没有帮助。

我使用了不同的 sinks,如 RollingFileSink、print(),希望可能是 kafka 中的 bug。但这似乎没有帮助。

我回滚到 flink(和所有连接器)v1.1.0 和 v1.1.1,希望最新版本中存在 bug。但这似乎没有帮助。

我添加了 zookeeper.connect 配置到属性对象中,希望关于它仅在 0.8 版本中有用的注释是错误的。但这似乎没有帮助。

我已经明确将检查点模式设置为 EXACTLY_ONCE (好主意drfloob),但似乎没有帮助。

我的请求

帮帮我!


1
只是为了好玩,尝试显式地设置EXACTLY_ONCE。env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) - drfloob
1
我有完全相同的问题,启动 flink 流作业后,同一个事件再次被消费。也许在保存检查点时,偏移量没有正确递增? - static-max
显式检查点模式没有起作用。我已经更新了帖子。不过,这是个好主意。 - mbarlocker
2个回答

10

(我已经在JIRA上发布了相同的回复,现在在这里跨发相同的内容)

根据您的描述,我假设您正在手动关闭作业,然后重新提交它,对吗?

除非您使用保存点(https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html),否则Flink不会在手动作业重启时保留完全一致性。 当作业失败并自动从之前的检查点恢复(启用检查点时,例如您使用env.enableCheckpointing(500))时,确切的一次保证才会生效。

实际上发生的是,Kafka消费者仅仅从ZK/Kafka中提交的现有位移开始读取数据,当您手动重新提交作业时,这些偏移量已经在第一次执行作业时提交到ZK/Kafka中。但是它们不适用于Flink的完全一致性语义;Flink内部使用检查点的Kafka偏移量来实现此功能。 Kafka消费者提交这些偏移量回ZK仅仅是为了向外界展示作业消费的进度(相对于Flink而言)。


嗨,Gordon,如果我更新/更改作业并重新提交它会怎样?为什么不能使用来自ZK的正确偏移量呢?似乎通过增加偏移量一可以很容易解决,或者我错过了什么?我不想在我的下游应用中使用命令行或添加额外的去重。或者我使用flink方式错误了吗? - static-max
就像我之前所说的,ZK中的偏移量只是为了暴露进度而被提交。Flink通过其内部的检查点机制实现了精确一次性操作;状态后端是正确的偏移量快照的存储位置。 提交到ZK的“外部”偏移可能不一定是正确的偏移,因为它们并不真正与Flink的检查点协作。 - Gordon Tai
如果您想更新/更改工作,通常这是流式作业的管理停机时间,并且Flink为此提供了保存点。在重新提交作业时,您可以提供先前的保存点作为起点。Flink还在进行支持“保存点和重新提交”作业的工作,以单个命令完成。 - Gordon Tai
1
还有一件值得指出的事情,以免我们在这里混淆:“精确一次”意味着 Flink 的用户/窗口状态中所有记录仅计算一次,而不是仅处理一次记录。如果发生故障,并且作业需要重播一部分数据,则 Flink 的检查点确保即使运算符的内部状态也被重放回来,看起来就好像没有包含要重播的数据所做出的任何更改。需要知道的是,Flink 的“精确一次”是重新播放数据和同时回滚内部状态的组合。 - Gordon Tai
1
我同意文档在这方面可以更具信息性。让我们为此打开一张工单吧 ;) - Gordon Tai
显示剩余3条评论

1

更新 2:我修复了偏移处理的错误,它已经合并到当前的主分支中。

更新:不是问题,在取消作业之前使用手动保存点(感谢 Gordon)

我检查了日志,似乎是偏移处理中的一个错误。我在https://issues.apache.org/jira/browse/FLINK-4618下报告了一个问题。我会在收到反馈后更新这个答案。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接