如何将Apache Flink中的DataStream(源)转换为List?

3
我的问题是如何将一个 DataStream 转换为 List,例如为了能够遍历它。

代码看起来像这样:
package flinkoracle;

//imports
//....

public class FlinkOracle {

    final static Logger LOG = LoggerFactory.getLogger(FlinkOracle.class);

    public static void main(String[] args) {
        LOG.info("Starting...");
        // get the execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        TypeInformation[] fieldTypes = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.STRING_TYPE_INFO};

        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
        //get the source from Oracle DB
        DataStream<?> source = env
                .createInput(JDBCInputFormat.buildJDBCInputFormat()
                        .setDrivername("oracle.jdbc.driver.OracleDriver")
                        .setDBUrl("jdbc:oracle:thin:@localhost:1521")
                        .setUsername("user")
                        .setPassword("password")
                        .setQuery("select * from  table1")
                        .setRowTypeInfo(rowTypeInfo)
                        .finish());

        source.print().setParallelism(1);

        try {
            LOG.info("----------BEGIN----------");
            env.execute();
            LOG.info("----------END----------");
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        LOG.info("End...");
    }

}

非常感谢您的预先帮助。 敬礼 Tamas


你好Thomas,欢迎来到Stackoverflow,请提供你已经尝试过的内容。更多信息请参见此处 - Yanis-git
你好 Yanis,我尝试了类似这样的代码:source.flatMap(new Splitter()).keyBy(0).sum(1);public static class Splitter implements FlatMapFunction<String, Tuple2<String,String>> { @Override
public void flatMap(String sentence, Collector<Tuple2<String, String>>out)throws Exception { for (String word: sentence.split(",")) { LOG.info("FOR..."); out.collect(new Tuple2<String,String>(word,"")); } } }
- Thomas
1
当您迭代DataStream时,您能否更详细地阐述您想要实现的目标?最好的方法可能是使用DataStream API表达该逻辑。 - Yogi Devendra
2个回答

1

Flink提供了一个迭代器sink来收集DataStream的结果,以进行测试和调试。可以按照以下方式使用:

import org.apache.flink.contrib.streaming.DataStreamUtils;

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)

您可以像这样将迭代器复制到一个新列表中:

while (iter.hasNext())
    list.add(iter.next());

Flink还为DataStream提供了一系列简单的write*()方法,主要用于调试目的。数据刷新到目标系统取决于OutputFormat的实现。这意味着并非所有发送到OutputFormat的元素都会立即显示在目标系统中。请注意:这些write*()方法不参与Flink的检查点,并且在失败情况下,这些记录可能会丢失。
writeAsText() / TextOutputFormat
writeAsCsv(...) / CsvOutputFormat
print() / printToErr()
writeUsingOutputFormat() / FileOutputFormat
writeToSocket

来源: 链接

您可能需要添加以下依赖项才能使用DataStreamUtils:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-contrib</artifactId>
    <version>0.10.2</version>
</dependency>

嗨,Nizar,感谢你的帮助。我尝试导入org.apache.flink.contrib.streaming.DataStreamUtils;但我收到了错误消息:包不存在。我使用的是apache flink 1.4.2。我在文档中看到这是一个现有的包,所以我不明白为什么会出错。请帮忙。 - Thomas
我相信这个类已经被移动到了包:org.apache.flink.streaming.api.datastream。链接:https://github.com/apache/flink/blob/022cd628cea67ce8eba1d56987301d20f192f480/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java 请尝试导入:import org.apache.flink.streaming.api.datastream.DataStreamUtils。 - Nizar
不幸的是,根据NetBeans IDE,DataStreamUtils不包含在这个包中,所以它没有帮助。 - Thomas
@Thomas 已更新答案,加入所需的依赖项以使用命名空间:org.apache.flink.contrib.streaming.DataStreamUtils - Nizar
嗨,这很有帮助,但现在我遇到了两个导入的问题。 org.apache.flink.api.java.tuple.* 和 org.apache.flink.api.java.typeutils.RowTypeInfo; 包不存在错误消息。当然,我也添加了其他依赖项:<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.4.2</version> </dependency> - Thomas
我已经解决了它,现在可以编译。但是以下代码不起作用:DataStream<Tuple4<String,String,String,String>> myResult = source; - Thomas

0
在更新的版本中,DataStreamUtils::collect已被弃用。相反,您可以使用DataStream::executeAndCollect,如果给定一个限制,它将返回最多该大小的List
var list = source.executeAndCollect(100);

如果您不知道有多少元素,或者只是想迭代遍历结果而不一次性将它们全部加载到内存中,那么您可以使用无参数版本来获取一个{{link2:CloseableIterator}}。

try (var iterator = source.executeAndCollect()) {
  // do something
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接