Apache Flink 检查点卡住。

4
我们正在运行一个作业,其中包含300GB到400GB的ListState,有时列表可以增长到数千个。在我们的用例中,每个项目必须具有自己的TTL,因此我们为此ListState的每个新项目创建一个新的定时器,并使用S3上的RocksDB后端。

目前大约有1.4亿个计时器(将在事件时间戳+40天触发)。

我们的问题是,突然作业的检查点会卡住,或者非常缓慢(比如几个小时内只有1%),直到最终超时。它通常会停止(flink仪表板显示0/12 (0%),而之前的行显示12/12 (100%)),原因是代码片段非常简单:

[...]
    val myStream = env.addSource(someKafkaConsumer)
      .rebalance
      .map(new CounterMapFunction[ControlGroup]("source.kafkaconsumer"))
      .uid("src_kafka_stream")
      .name("some_name")

      myStream.process(new MonitoringProcessFunction()).uid("monitoring_uuid").name(monitoring_name)
        .getSideOutput(outputTag)
        .keyBy(_.name)
        .addSink(sink)
[...]

更多信息如下:

  • AT_LEAST_ONCE检查点模式似乎比EXACTLY_ONCE更容易卡住。
  • 几个月前,状态达到了1.5TB的数据和数十亿的计时器,没有任何问题。
  • 运行两个任务管理器的机器上的RAM、CPU和网络看起来正常。
  • state.backend.rocksdb.thread.num = 4
  • 第一次事故发生在我们收到大量事件(每分钟约有数百万个)时,但上一次没有发生这种情况。
  • 所有事件都来自Kafka主题。
  • 当处于AT_LEAST_ONCE检查点模式时,作业仍然正常运行和消耗。

这已经是我们第二次遇到拓扑结构运行正常,每天只有几百万个事件,然后突然停止检查点。我们不知道是什么原因导致的。

有人能想到是什么原因突然导致检查点卡住吗?

1个回答

4

几点想法:

如果您有许多计时器几乎同时触发,这些计时器的风暴将阻止其他任何事情发生 - 任务将循环调用 onTimer,直到没有更多计时器需要触发,在此期间它们的输入队列将被忽略,并且检查点障碍不会进展。

如果这是您遇到问题的原因,您可以向计时器添加一些随机抖动,以便事件风暴不会在以后变成计时器风暴。重新组织使用状态TTL可能是另一个选择。

如果堆上有大量计时器,这可能导致非常高的GC开销。这并不一定会使作业失败,但可能会使检查点不稳定。在这种情况下,将计时器移入RocksDB可能会有所帮助。

还有:由于您正在使用RocksDB,从ListState切换到MapState,以时间为键,可以让您在每次更新后删除单个条目而无需重新序列化整个列表。 (对于RocksDB中的每个MapState中的每个键/值对都是单独的RocksDB对象。)以这种方式使清理更有效可能是最好的解决方法。


嘿,感谢您的回答!我把问题定位在计时器上,因为我认为这可能是问题所在,很高兴看到您确认了这一点。那么我们将使用TTL重新实现它,由于我们正在使用RocksDB后端,我们考虑在其他配置中使用“cleanupInRocksDbCompactFilter”和“filter”为1_000。您能确认这是正确的选择吗?增加1_000会影响检查点时间吗? - Rocel
另外,MapState及其单独的RocksDB对象如何提高检查点时间和效率? - Rocel
1
如果我正确理解您当前的实现,每次计时器触发时,您都会修改列表以删除相应的元素。这需要对整个列表进行反序列化,然后重新序列化更新后的列表。对于成千上万条目,这将很慢。而使用MapState,所有必须发生的就是删除一个键/值对,这可以在不触及Map的其余部分的情况下完成(假设您使用计时器时间戳作为键)。现在,您正在进行英勇的工作来处理每个计时器触发,这会阻止检查点。 - David Anderson
我没有关于状态TTL的经验,我只会猜测。 - David Anderson

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接