Apache Flink - 作业内自定义Java选项未被识别

8
我已经在flink-conf.yaml中添加了以下行:
env.java.opts: "-Ddy.props.path=/PATH/TO/PROPS/FILE"
当启动jobmanager(jobmanager.sh start cluster)时,我可以在日志中看到jvm选项确实被识别了。
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -  JVM Options:
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Xms256m
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Xmx256m
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -XX:MaxPermSize=256m
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Ddy.props.path=/srv/dy/stream-aggregators/aggregators.conf
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Dlog.file=/srv/flink-1.2.0/log/flink-flink-jobmanager-0-flinkvm-master.log
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Dlog4j.configuration=file:/srv/flink-1.2.0/conf/log4j.properties
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Dlogback.configurationFile=file:/srv/flink-1.2.0/conf/logback.xml

但是,当我运行flink作业(flink run -d PROG.JAR)时,System.getProperty("dy.props.path")返回null(并且在打印系统属性时,我看到它确实不存在。)

问题实际上是 - 我该如何设置系统属性,以便在flink作业的代码中可用?

1个回答

2
这个问题与Flink的运行时架构密切相关[1]
我知道你正在独立集群中运行你的作业。请记住,JobManager和TaskManager在不同的JVM实例中运行。你必须考虑每个代码块将在哪里执行。
例如,像map或filter这样的转换中的代码是在TaskManager上执行的。 你的入口类main方法中的代码是在命令行工具flink中执行的,当然它没有设置系统属性,因为它为作业提交生成了一个临时的(-d) JVM。
如果通过WebUI提交作业,则来自主方法的代码将在JobManager上执行,因此该属性将在那时设置。
总的来说,我更倾向于不鼓励通过系统属性传递程序参数,因为这是一种不好的做法。
下面是一个简单的例子:
我启动了:
  • 一个带有env.java.opts:"-Ddy.props.path=jobmanager"的JobManager
  • 一个带有env.java.opts:"-Ddy.props.path=taskmanager"的TaskManager
我的作业代码如下:
object Main {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.fromCollection(1 to 4)

    val prop = System.getProperty("dy.props.path")
    stream.map(_ => System.getProperty("dy.props.path") + "  mainArg: " + prop).print()

    env.execute("stream")
  }
}

当我通过 flink 工具提交代码时,输出结果如下:

taskmanager  mainArg: null
taskmanager  mainArg: null
taskmanager  mainArg: null
taskmanager  mainArg: null

当通过WebUI提交时,我会收到以下信息:
taskmanager  mainArg: jobmanager
taskmanager  mainArg: jobmanager
taskmanager  mainArg: jobmanager
taskmanager  mainArg: jobmanager

那么,如何通过命令行使用jvm选项向fork jvm提交作业?我看到了yarn的选项,但独立集群呢? - huskywolf
在独立集群中,必须先生成任务管理器,然后再提交作业。您可以在启动任务管理器时设置JVM选项。 - Dawid Wysakowicz

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接