Apache Flink: 由于类型擦除,函数的返回类型无法自动确定

7

我用Java编写了一个简单的程序,使用Flink框架,可以接受文件或文本作为输入,并使用flatMap函数打印所有单词。

这是我的代码:

        final ParameterTool params = ParameterTool.fromArgs(args);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.getConfig().setGlobalJobParameters(params);
        // show user defined parameters in the apache flink dashboard

        DataStream<String> dataStream;

        if(params.has("input")) 
        {
            System.out.println("Executing Words example with file input");
            dataStream = env.readTextFile(params.get("input"));
        }else if (params.has("host") && params.has("port")) 
        {
            System.out.println("Executing Words example with socket stream");
            dataStream = env.socketTextStream(params.get("host"), Integer.parseInt(params.get("port")));
        }
        else {
            System.exit(1);
            return;
        }

        DataStream<String> wordDataStream = dataStream.flatMap(
                (String sentence, Collector<String> out) -> {
                    for(String word: sentence.split(" "))
                        out.collect(word);
        });

        wordDataStream.print();

        env.execute("Word Split");  

但是当我使用以下命令运行它时:

bin/flink run -c Words FlinkExample-0.0.1-SNAPSHOT.jar --host localhost --port 9999

我遇到了以下错误:
程序出现以下异常:
函数'main(Words.java:32)'的返回类型由于类型擦除而无法自动确定。您可以通过在转换调用的结果上使用returns(...)方法或让您的函数实现'ResultTypeQueryable'接口来提供类型信息提示。
(第32行是指第二个DataStream的声明)
3个回答

15

我认为这个错误信息的简短描述已经很好了,但让我稍微扩展一下。

为了执行程序,Flink需要知道正在处理的值的类型,因为它需要对其进行序列化和反序列化。 Flink的类型系统基于描述数据类型的“TypeInformation”。当您指定函数时,Flink会尝试推断该函数的返回类型。在您的示例的FlatMapFunction中,传递给收集器(Collector)的对象的类型被推断为返回类型。

不幸的是,由于类型擦除,有些Lambda函数会丢失此信息,因此Flink无法自动推断类型。因此,您必须明确声明返回类型。

您可以按以下方式提供TypeInformation:

DataStream<String> wordDataStream = dataStream.flatMap(
    (String sentence, Collector<String> out) -> {
        for(String word: sentence.split(" "))
        out.collect(word); // collect objects of type String
    }
).returns(Types.STRING); // declare return type of flatmap lambda function as String

0

或者你可以创建一个函数类:

new FlatMapFunction<Input, Output>() {
  @Override
  public void flatMap(Input input, Collector<Output> collector) throws Exception {
    ...
  }
}

0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接