使用Apache Flink从Web获取JSON元素

7
阅读了 Apache Flink 的几个文档页面(官方文档dataArtisans),以及 官方存储库 中提供的示例后,我看到他们经常使用已下载的文件作为流数据源,并始终连接到本地主机。
我正在尝试使用 Apache Flink 下载包含动态数据的 JSON 文件。我的意图是尝试将可以访问 JSON 文件的 URL 建立为 Apache Flink 的输入源,而不是使用另一个系统下载并使用 Apache Flink 处理已下载的文件。
是否可以使用 Apache Flink 建立此网络连接?
1个回答

5

您可以将要下载的URL定义为输入 DataStream ,然后从 MapFunction 内部下载文档。以下代码演示了这一点:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> inputURLs = env.fromElements("http://www.json.org/index.html");

inputURLs.map(new MapFunction<String, String>() {
    @Override
    public String map(String s) throws Exception {
        URL url = new URL(s);
        InputStream is = url.openStream();

        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is));

        StringBuilder builder = new StringBuilder();
        String line;

        try {
            while ((line = bufferedReader.readLine()) != null) {
                builder.append(line + "\n");
            }
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }

        try {
            bufferedReader.close();
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }

        return builder.toString();
    }
}).print();

env.execute("URL download job");

我运行了示例代码,但它只运行一次并读取了整个文件。然而它不是流式的,我以为当JSON文件增加时它会继续读取。 - zt1983811
为此,您需要使用ContinuousFileMonitoringFunction。流本身并不意味着作业将无限期运行。只有当您拥有非有限源时才会发生这种情况。但在这种情况下,env.fromElements函数生成有限的流源。一旦此源到达其末尾,程序将终止。 - Till Rohrmann

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接