如何在使用Flink Runner的Apache Beam中执行检查点?

4

我正在从一个无界数据源(Kafka)读取数据,并将其词频写入到另一个Kafka主题中。现在,我想在Beam Pipeline中执行checkpoint。我已经按照Apache Beam文档中的所有说明进行了操作,但是checkpoint目录仍未创建。

以下是我用于Pipeline的参数:

--runner=FlinkRunner
--streaming=true
--parallelism=2
--checkpointingInterval=1000
--checkpointTimeoutMillis=5000
--minPauseBetweenCheckpoints=500
--externalizedCheckpointsEnabled=true
--retainExternalizedCheckpointsOnCancellation=true

有人可以帮我解决一下检查点吗?

2个回答

0

我知道这已经有点老了,但我想赞同你的答案。 我们在2019年构建了一个Docker化的Flink和Beam,并使用这些选项运行。

--runner=FlinkRunner --streaming=true --checkpointingInterval=30000 --env=dev

我们已经在conf.yml中配置了rocksdb作为后端。


0

我已经在解决方案上进行了工作,其中一种方法是您可以在link集群的flink-conf.yaml中更改checkpoint.state.dir路径,另一种方法是使用flinkPipelineOptions-

        @Description(
                "Sets the state backend factory to use in streaming mode. "
                        + "Defaults to the flink cluster's state.backend configuration.")
        Class<? extends FlinkStateBackendFactory> getStateBackendFactory();
        void setStateBackendFactory(Class<? extends FlinkStateBackendFactory> stateBackendFactory);

通过设置setStateBackendFactory(我使用自定义类完成)

  static class  bakend implements FlinkStateBackendFactory{

        @Override
        public StateBackend createStateBackend(FlinkPipelineOptions options) {
            return new FsStateBackend("file:///Users/myPc/word-count-beam/src/checkpoint/");

        }
    }

这将创建一个checkpointDir,您还需要设置checkpointinginterval的值以启用检查点。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接