Stream.parallel()使用新线程吗?

5

所以,我正在努力理解Java 8中引入的Stream API。我正在尝试创建一个可以在单独的线程上运行的流(仅供教育目的)。

String oracle = "http://www.oracle.com";
URL url = new URL(oracle);
BufferedReader in = new BufferedReader(new InputStreamReader(url.openStream()));
in.lines().parallel().forEach(System.out::println);
System.out.print("CLOSING THE INPUT STREAM!, shouldnt this crash?");
in.close();

结果不是我所预期的(当另一个线程正在读取输入流时,我关闭了它,所以我预计会导致崩溃)。请注意.parallel()方法的调用。相反,代码似乎以顺序方式执行,没有任何问题。

<script language="JavaScript" src="http://www.oracleimg.com/us/assets/metrics/ora_ocom_hp.js"></script>
<!-- End SiteCatalyst code --> 

            <!-- SS_END_SNIPPET(fragment6,1)-->
<!-- SS_BEGIN_SNIPPET(fragment7,ui)-->          <!-- SS_END_SNIPPET(fragment7,ui)-->
</html>
CLOSING THE INPUT STREAM!, shouldnt this crash?

有人知道发生了什么吗?为什么我的代码没有崩溃?


2
如果您不想阻塞事物,可以使用 ForkJoinPool - Maroun
3个回答

13

并发流(parallel stream)确实尝试将读取行的工作分配到多个线程中。但是,调用本身是阻塞的,即该语句会等待所有线程完成后才继续执行下一条语句(在关闭输入流之前)。

需要注意的一点是,forEach不能保证并行操作执行与流元素相同的顺序,因此在这种情况下打印的行可能与原始网页的顺序不同(请参见 https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#forEach-java.util.function.Consumer-)。


我明白,好答案。不过,如果你打算让当前的线程空闲下来,那为什么要创建一个新线程呢?.parallel()似乎并不那么有用。我猜这是一个不同的问题。尽快接受你的问题作为已解决。谢谢。 - frankelot
2
@feresr:关键在于,如果它生成了(比如)十个线程并行处理工作,那么整个操作可能比仅在一个线程中执行要快十倍。(实际上,由于缓存效应,它可能不会那么快,或者可能会更快,但这就是想法!) - psmears
2
使用并行流,执行速度可能会更快。您可以尝试将其与顺序流进行比较以测量差异。它仍然需要阻塞,因为forEach是一个终端操作,需要组合某个结果。您还可以调用sum操作,在这种情况下,它必须等待结果被组合。 - M A
1
@feresr 这篇文章提供了一个很好的例子。 - Maroun
我明白了,我的困惑来自于错误地认为 .parallel() 与 RxJava 中的 .subscribeOn 和 .observeOn 方法有些相似。现在我明白了,感谢大家的解答,这使得问题更加清晰易懂了。 - frankelot

2

如果您想在后台执行某些操作而不会立即阻塞其完成,可以使用java.util.concurrent.CompletableFuture.runAsync(Runnable)及其相关方法。它返回一个CompletableFuture,如果需要,稍后可以加入(join)。


1
正如已经提到的那样,平行流会阻塞当前线程直到所有平行任务完成。实际上,当前线程通常也用于执行一些工作,但如果它完成了它的部分,那么它就会等待其他线程(或者窃取一些它们的工作来帮助它们)。
然而,有一个特殊情况:如果平行流操作抛出异常,则您在主线程中的流处理将(异常地)结束,但其他后台线程仍可能继续处理某些输入块。您可以使用以下代码检查这一点:
// Create list of Strings "0", "1", "2", ..., "99"
List<String> list = IntStream.range(0, 100).mapToObj(String::valueOf)
                             .collect(Collectors.toCollection(ArrayList::new));
// replace one with non-numeric
list.set(1, "foo");

// Convert every string to number and print it
try {
    list.parallelStream().mapToInt(Integer::parseInt).forEach(System.out::println);
} catch (NumberFormatException e) {
    // well some non-number encountered
}
System.out.println("Exited");

运行此代码,您可能会偶尔看到在 "Exited" 消息后打印出一些数字。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接