在无限流上调用 .map() 方法?

4
根据SE 8 Stream.map()的Javadocs,该方法执行以下操作:
返回一个流,其中包含将给定函数应用于此流元素的结果。
然而,在一本我正在阅读的网络编程书籍(《使用Java学习网络编程》,Richard M. Reese)中,对一个回声服务器实现了大致以下代码片段。
Supplier<String> inputLine = () -> {
    try {
        return br.readLine();
    } catch(IOException e) {
        e.printStackTrace();
        return null;
    }
};

Stream.generate(inputLine).map((msg) -> {
    System.out.println("Recieved: " + (msg == null ? "end of stream" : msg));
    out.println("echo: " + msg);
    return msg;
}).allMatch((msg) -> msg != null);

这是一种实现将用户输入打印到套接字输入流的功能性方法。它按预期工作,但我不太理解它的原理。是因为map知道流是无限的,所以它会在新的流令牌可用时惰性执行吗?似乎在map迭代的同时向集合中添加东西有点黑魔法。请有人帮我理解背后发生了什么。
以下是我重新表述内容以避免混淆的map使用方式。我认为作者试图避免无限循环,因为你无法跳出forEach。
Stream.generate(inputLine).allMatch((msg) -> {
        boolean alive = msg != null;
        System.out.println("Recieved: " + (alive ? msg : "end of stream"));
        out.println("echo: " + msg);

        return alive;
});

1
流始终是惰性的。然而,在map中使用副作用有点令人不适。 - RealSkeptic
3
这里他们应该使用“peek”而不是“map”。像这样使用“map”相当奇怪。 - Radiodef
顺便问一下,“将某物添加到集合中”是什么意思?我认为在这段代码中没有使用任何集合。 - Radiodef
3
@Radiodef 不,peek也不是为了这个而存在的,“此方法主要用于支持调试”。一个简单的for循环会是一种更不奇怪的编写方式。 - Andy Turner
1
@tdct 我猜实际上应该是 .takeWhile(Objects::nonNull).forEach(...) 而不是使用 filter。(尽管一般来说,我同意Andy的看法,使用循环更好。) - Radiodef
显示剩余2条评论
3个回答

3

流是惰性的。把它们看作是相互传递桶子的链中的工人。这种惰性表现在它们只有在前面的工人要求它们时才会向后面的工人请求下一个桶。

因此,最好将其视为最终操作allMatch - 因此是急切的 - 请求map流的下一个项,map流请求generate流的下一个项,并且generate流转到其供应商,并尽快提供该项。

allMatch停止请求项目时,它就会停止。并且在知道答案时这样做。此流中的所有项目都不为空吗?一旦allMatch接收到一个空项,它就知道答案是false,并且将完成并不再请求任何其他项。因为流是无限的,否则它将不会停止。

因此,你有两个因素导致它以这种方式工作 - 一个是allMatch急切地请求下一个项(只要前面的项不为空),另一个是generate流,为了提供下一个项,可能需要等待等待用户发送更多输入的供应商。

但应该说这里不应该使用map。在map中不应该有副作用 - 它应该用于将一种类型的项映射为另一种类型的项。我认为这个例子只是作为学习辅助工具使用的。更简单和直接的方法是使用BufferedReader的方法lines(),它可以给你一个来自缓冲读取器的有限Stream


有没有其他方法可以在函数式编程中实现与 map 相同的结果?我相信 forEach 是终端操作。因此,您可以使用带有条件断点的 forEach 以更清晰的方式实现这一点。 - tdct
1
不,它不能与forEach一起使用,因为除了异常之外没有停止它的方法。整个技巧基于allMatch可以根据接收到的项目停止的事实。您可以使用peek而不是map,但无论如何,严格来说,如果存在副作用,则它并不真正是函数式的。 - RealSkeptic
1
使用 peek 而不是 map 不会改变这段代码滥用未被保证的实现细节的事实。map 不仅不能保证只处理到第一个 null,而且不能保证按顺序处理它们。更糟糕的是,Stream.generate 生成的是一个无序流,因此即使是 Java 9 的 Stream.generate(…).takeWhile(…) 也不能保证做到所需的事情。 - Holger

2
是的 - Stream在执行终止操作(最终操作)之前,会被懒惰地设置。或者更简单地说:
只要您的流上的操作返回另一个流,您就没有终止操作,您可以继续链接,直到有返回任何不同于流的东西,包括void。
这是有道理的,因为为了能够返回除流以外的任何内容,您流中较早的操作需要被评估,以实际提供数据。
在这种情况下,根据文档,allMatch返回一个布尔值,因此需要执行最终操作来计算该布尔值。这也是您提供Predicate限制结果Stream的点。
还要注意,在文档中指出:
这是一个短路终端操作。点击链接以获取有关这些终端操作的更多信息,但终端操作基本上意味着它将实际执行该操作。此外,限制您的无限流是该方法的“短路”方面。

谢谢。只要你执行的方法不断返回另一个流,你就可以通过链接所有操作来懒惰地定义你想要做的事情。这真的是个完美的答案,真正澄清了我的问题!我必须选中realskeptic,因为他更深入地解释了,但我也非常感激这个回答。 - tdct

1
这里是文档中最相关的两个句子。你提供的代码片段是它们完美配合的例子:
  • Stream::generate(Supplier<T> s)表示它返回:

    返回一个无限连续的无序流,其中每个元素都由提供的Supplier生成。

  • Stream包文档的第三点

    寻求惰性。许多流操作,如过滤、映射或重复项删除,可以实现惰性计算,从而暴露出优化机会。例如,“查找具有三个连续元音字母的第一个字符串”不需要检查所有输入字符串。流操作分为中间(生成流)操作和终端(产生值或副作用)操作。中间操作总是惰性的。

简而言之,这个生成的流会等待进一步的元素,直到达到终端操作。只要在提供的 Supplier<T> 中执行,流管道就会继续。
例如,如果您提供以下Supplier,执行就没有停止的机会,将无限继续:
Supplier<String> inputLine = () -> {
    return "Hello world";
};

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接