Apache Flink:当我重新启动应用程序时,我的应用程序无法从检查点中恢复。

7
我有一个Flink工作任务,其中我正在从文件夹中读取文件并将其转储到数据库中。每天会有新的文件放入该文件夹中。
我启用了检查点,以便如果由于任何原因Flink任务停止并且我需要重新启动,则Flink任务不应读取已经读取的文件。
我在我的代码中添加了下面的行,但是当我重新启动我的任务时,Flink任务又会读取所有文件。
env.setStateBackend(new FsStateBackend("file:///C://Users//folder"));
env.enableCheckpointing(10L);
2个回答

10

检查点是一种机制,用于在应用程序执行期间从故障中恢复,而不是恢复显式取消的应用程序。

如果您有一个正在运行的应用程序,并且执行失败(无论出于什么原因),Flink将尝试通过重新启动它并从上一个检查点初始化操作符的状态来恢复该应用程序。如果恢复失败(例如因为没有足够的处理插槽可用),则任务被视为失败。

如果您手动取消应用程序并重新启动它,则Flink将不使用检查点来初始化操作符的状态。实际上,默认情况下,当您取消应用程序时,Flink将删除所有检查点。

您要寻找的概念是保存点。保存点与检查点非常相似,但由用户手动触发,并且在应用程序显式取消时不会自动删除。在启动应用程序时,可以从保存点开始,这意味着操作符状态将从保存点初始化。

还有不同的重新启动策略可用于配置Flink尝试多少次以及在哪些间隔内重新启动失败的应用程序。


请问你能帮我配置一下Savepoints吗? - Ankit
如果您的应用程序启用了检查点,那么就没有什么需要配置的了。您可以从CLI客户端或REST API触发保存点。 - Fabian Hueske
你可以终止任务管理器进程。直到JM有足够的处理插槽重新启动作业,该作业才能恢复。你需要一个备用TM进程或手动启动一个新进程。 - Fabian Hueske
@FabianHueske 在我的情况下,作业正在运行,然后我使用Intellij的IDEA重新运行配置按钮重新启动它。我收到的消息是'Process finished with exit code 130(interrupted by signal 2: SIGINT'。它再次运行后,应该从上一个检查点重新启动还是创建一个新的呢? - Alator

4

@fabian-hueske涵盖了“计划中”的重启发生的所有方面

您应该计划使用保存点来取消作业

flink cancel --withSavepoint ${SAVEPOINT_DIR} ${JOBID}

使用前一步骤的保存点重新启动新作业。

flink run -s ${SAVE_POINT} -p ${PARALLELISM} -d ${JOB_JAR} ${JOB_ARGS}

flink cancel -s ${SAVEPOINT_DIR} ${JOBID}取消 Flink 作业并使用指定的保存点目录和作业 ID。 - user544192
从版本1.9开始,使用flink stop [-p targetDirectory] [-d] <jobID>。 - PeterS

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接