Flink如何从集群GUI正确提交作业参数?

4
我的目标是通过群集GUI中的“程序参数”字段将args传递给Flink作业的Main()函数。enter image description here 并以某种方式(理想情况下是按键名)在Main()函数中访问它们,例如:
public static void main(String[] args) throws Exception {

    ParameterTool parameter = ParameterTool.fromArgs(args);

    CustomProps props = new CustomProps (DEFAULT_PROPERTIES_FILE);

    String kafkaAutoOffsetReset = props.getKafkaAutoOffsetReset();
    String cassandraClusterUrl = props.getCassandraClusterUrl();

    if (args.length == 1 && args[0] != null) {

        cassandraClusterUrl = parameter.get("cassandraClusterUrl");
        kafkaAutoOffsetReset = parameter.get("kafkaOffset");
    }

    //Other code...

}

我已经尝试过“ParameterTool”,但是我没有得到任何结果,如果我尝试像这样的东西:
kafkaAutoOffsetReset = args[0];

只有当我在“程序参数”字段中仅放置一个单词时,它才起作用。因此,如果我放置:
blah

它说它被设置为“blah”,但如果我尝试这些中的任何一个:
-kafkaOffset blah
--kafkaOffset blah
-kafkaOffset:blah
-kafkaOffset=blah

我什么也得不到。我知道在CLI中传递参数给jar的示例是:
--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out

但似乎我错过了使用GUI进行操作的不同方法,而且我在寻找相关文档方面没有成功。
简而言之:
在Flink Cluster GUI中通过“程序参数”字段提交多个参数的正确方法是什么?在Main()函数中访问它们的正确方法又是什么?
提前感谢您的所有帮助!
3个回答

9

在Flink中,程序参数应该按照如下所示的方式提供:

--custom.key.one custom.value.one --custom.key.two custom.value.two

请注意,这些参数需要使用HTML标签保留原格式。

1
我已经理解了。以下是传递参数的方法: 在此输入图片描述

那么这是否意味着我们需要像这样传递它 - parameterName="parameterValue",例如 kAKFA_ENDPOINT="localhost" KAFKA_PORT="9092"? - Piyush_Rana
这里提供的解决方案都对我没用。我正在使用flink版本1.10.0并传递了4个参数。问题可能是什么?以下是异常信息:org.apache.flink.runtime.rest.handler.RestHandlerException: 期望只有一个值 [kafka.brokers="192.168.13.178:9092" kafka-group="group-1" kafka-topic="topic-1" kafka-start-position="latest"]。位于 org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter(HandlerRequestUtils.java:59) at org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter(HandlerRequestUtils.java:47) - misterbaykal
@misterbaykal我不确定最近版本中GUI读取参数的格式是否已更改,但从您的堆栈跟踪看来,我认为您的问题在于在将参数读入程序后对其进行的操作。它说“只期望一个值”,所以我猜测你的方法想要一个单一的值,而直接使用args时并不是方法想要的。尝试在代码中手动设置值并查看是否作为参数工作。如果可以,请切换到使用args填充该对象。 - Jicaar
@Jicaar 感谢您的反馈。我按照您的建议尝试了,但并没有起作用。当我从IDE运行代码时,使用args设置没有任何问题。但是当我将其部署到WebUI时,它会出现相同的错误。不知何故,WebUI需要以不同的方式处理某些内容。 - misterbaykal
我正在使用 "ParameterTool.fromArgs(args)"。 - misterbaykal
我通过将“,”更改为“-”来解决了我的问题。使用“-”定义参数会出错。正确的格式是:--brokers 192.168.80.160:32772-192.168.80.160:32773-192.168.80.160:32774 --group a-group --inTopic a-topic --outTopic b-topic。 - misterbaykal

0
通过POST方法将参数作为JSON数据体发送更简单且不容易出错,这是因为可以避免URL转义的问题。

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/

一个示例的POST等效方法如下:

curl --location --request POST 'http://YOUR_REST_ENDPOINT/jars/YOUR_UPLOADED_JAR_ID/run' \
--header 'Content-Type: application/json' \
--data-raw '{
    "entryClass": "com.abc.YourMainClass",
    "parallelism": "4",
    "programArgs": "--yourFirstParam=1 --yourOtherCustomArg=something",
    "savepointPath": null,
    "allowNonRestoredState": null
}'

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接