我们正在运行一个作业,其中包含300GB到400GB的ListState,有时列表可以增长到数千个。在我们的用例中,每个项目必须具有自己的TTL,因此我们为此ListState的每个新项目创建一个新的定时器,并使用S3上的RocksDB后端。
目前大约有1.4亿个计时器(将在事件时间戳+40天触发)。
我们的问题是,突然作业的检查点会卡住,或者非常缓慢(比如几个小时内只有1%),直到最终超时。它通常会停止(flink仪表板显示0/12 (0%)
,而之前的行显示12/12 (100%)
),原因是代码片段非常简单:
[...]
val myStream = env.addSource(someKafkaConsumer)
.rebalance
.map(new CounterMapFunction[ControlGroup]("source.kafkaconsumer"))
.uid("src_kafka_stream")
.name("some_name")
myStream.process(new MonitoringProcessFunction()).uid("monitoring_uuid").name(monitoring_name)
.getSideOutput(outputTag)
.keyBy(_.name)
.addSink(sink)
[...]
更多信息如下:
- AT_LEAST_ONCE检查点模式似乎比EXACTLY_ONCE更容易卡住。
- 几个月前,状态达到了1.5TB的数据和数十亿的计时器,没有任何问题。
- 运行两个任务管理器的机器上的RAM、CPU和网络看起来正常。
state.backend.rocksdb.thread.num = 4
- 第一次事故发生在我们收到大量事件(每分钟约有数百万个)时,但上一次没有发生这种情况。
- 所有事件都来自Kafka主题。
- 当处于AT_LEAST_ONCE检查点模式时,作业仍然正常运行和消耗。
这已经是我们第二次遇到拓扑结构运行正常,每天只有几百万个事件,然后突然停止检查点。我们不知道是什么原因导致的。
有人能想到是什么原因突然导致检查点卡住吗?