Flink Table API无法将DataSet转换为DataStream

4

我正在使用Java编写Flink Table API,想要将DataSet转换为DataStream....以下是我的代码:

TableEnvironment tableEnvironment=new TableEnvironment();
Table tab1=table.where("related_value < 2014").select("related_value,ref_id");
DataSet<MyClass>ds2=tableEnvironment.toDataSet(tab1, MyClass.class);
DataStream<MyClass> d=tableEnvironment.toDataStream(tab1, MyClass.class);

但是当我尝试运行此程序时,它会抛出以下异常:
org.apache.flink.api.table.ExpressionException: JavaStreamingTranslator的根无效:Root(ArraySeq((related_value,Double), (ref_id,String)))。您是否尝试将基于DataSet的表格转换为DataStream或反之亦然?我想知道如何使用Flink Table API将DataSet转换为DataStream?
另一件我想知道的事情是,对于模式匹配,有Flink CEP库可用。但是是否可以使用Flink Table API进行模式匹配?

请不要在一个Stackoverflow问题中问多个问题。请另开一条线程来提出您的模式匹配问题。 - Fabian Hueske
2个回答

3

Flink的Table API不适用于将DataSet转换为DataStream或反之。使用Table API无法实现,目前也没有其他方法可以在Flink中完成该操作。

DataStreamDataSet API统一(处理批处理作为流式处理的特殊情况,即有限流)已列入Flink的长期路线图。


好的.....只是想知道Flink的Table API是否可以用于模式匹配或CEP? - Akki
1
请为此开一个新问题。Stackoverflow不是讨论的场所,它是一个问答服务。如果在同一主题下回答多个问题,其他人将无法找到答案。 - Fabian Hueske

0

当使用TableEnvironment时,您无法转换为DataStream API,您必须创建一个StreamTableEnvironment来从表格转换为DataStream,类似于以下内容:

final EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(configuration, fsSettings);
DataStream<String> finalRes = fsTableEnv.toAppendStream(tableNameHere, MyClass.class);

希望能在某种程度上帮助到您。

此致敬礼!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接