Java中,顺序流在哪个线程中执行?

12

阅读有关流的文档时,我遇到了以下几句话:

  • ... 从行为参数中尝试访问可变状态会给您带来一个糟糕的选择……如果您不同步访问该状态,则存在数据竞争,因此您的代码已经失效……[1]

  • 如果行为参数确实具有副作用……没有保证在同一流管道中对“相同”元素的不同操作在同一线程中执行。[2]

  • 对于任何给定的元素,动作可以在库所选择的任何时间和任何线程中执行。[3]

这些语句没有区分顺序流和并行流。所以我的问题是:

  1. 顺序流的管道在哪个线程中执行?它总是调用线程还是实现可以选择任何线程?
  2. 如果流是顺序的,forEach终端操作的操作参数在哪个线程中执行?
  3. 在使用顺序流时是否需要使用任何同步?


所有的问题都适用于ParallelStream。 - Ramandeep Nanda
如果您需要在调用线程上按顺序处理流元素,请使用Stream.iterator() - morgwai
2个回答

3
所有这些都归结于规范所保证的内容,以及当前实现可能具有超出保证的其他行为。
Java语言架构师Brian Goetz在相关问题中发表了关于规范的相关观点:
“规范的存在是为了描述调用者可以依赖的最小保证,而不是描述实现的内容。”
“当规范说‘不会保留属性X’时,它并不意味着属性X永远不会被观察到;它意味着实现没有义务保留它。 (HashSet不能保证迭代其元素将保留它们插入的顺序,但这并不意味着这不可能偶然发生 - 您只能不能指望它。)”
这意味着即使当前实现具有某些行为特性,也不应依赖它们或假设它们不会在库的新版本中更改。

顺序流管道线程

“一个顺序流的管道在哪个线程中执行? 它总是调用线程还是实现可以自由选择任何线程?”
目前的流实现可能使用调用线程,也可能使用一个或多个线程。 因为API未指定任何内容,因此不应依赖此行为。

forEach执行线程

“如果流是顺序的,那么forEach终端操作的操作参数在哪个线程中执行?”
虽然当前实现使用现有线程,但不能依赖它,因为文档指出线程的选择取决于实现。 实际上,没有保证元素不会由不同的线程处理不同的元素,尽管这不是当前流实现所做的。
根据API:
“对于任何给定的元素,可以在库选择的任何时间和任何线程中执行操作。”
请注意,虽然在讨论遇到顺序时,API专门调用并行流,但是Brian Goetz明确了该行为的动机,并不是特定于并行流的任何行为:
“在此明确指出并行情况的意图是教学[...]。但是,对于不了解并行性的读者,几乎不可能不假设forEach将保留遇到顺序,因此添加了这个句子以帮助澄清动机。”

使用顺序流进行同步

“当使用顺序流时,我必须使用任何同步吗?”

当前实现可能会正常工作,因为它们对于顺序流的forEach方法使用单个线程。然而,由于流规范不能保证这一点,因此不应该依赖它。因此,应该像多个线程调用这些方法一样使用同步。

话虽如此,流文档明确建议不要使用需要同步的副作用,并建议使用可减少操作代替可变累加器:

Many computations where one might be tempted to use side effects can be more safely and efficiently expressed without side-effects, such as using reduction instead of mutable accumulators. [...] A small number of stream operations, such as forEach() and peek(), can operate only via side-effects; these should be used with care.

As an example of how to transform a stream pipeline that inappropriately uses side-effects to one that does not, the following code searches a stream of strings for those matching a given regular expression, and puts the matches in a list.

     ArrayList<String> results = new ArrayList<>();
     stream.filter(s -> pattern.matcher(s).matches())
           .forEach(s -> results.add(s));  // Unnecessary use of side-effects!

This code unnecessarily uses side-effects. If executed in parallel, the non-thread-safety of ArrayList would cause incorrect results, and adding needed synchronization would cause contention, undermining the benefit of parallelism. Furthermore, using side-effects here is completely unnecessary; the forEach() can simply be replaced with a reduction operation that is safer, more efficient, and more amenable to parallelization:

     List<String>results =
         stream.filter(s -> pattern.matcher(s).matches())
               .collect(Collectors.toList());  // No side-effects!

这是我当时也发现的引用摘要。我也得出结论,这是未指定的行为,不同的Stream实现可能不使用调用线程来执行管道。尽管你最后一段提到了反模式,但并不总是可能防止副作用。毕竟,Stream的一个用例是处理可能太大而无法放入内存的数据。由于没有人能提供更好的答案(给出引用),我相信这个答案是正确的。 - Alex R
1
是的,他们甚至特别指出forEachpeek是通过副作用来操作的,只是应该“小心使用”。 - M. Justin
@AlexR 是的,说实话,我也更喜欢一些来自权威人士口中的东西,而不是一堆引用和规格说明,这些并没有明确排除行为。 - M. Justin
另一个微妙之处是:“对于表现良好的流源,可以在终端操作开始之前修改源,并且这些修改将反映在涵盖的元素中。所有从JDK集合和大多数其他JDK类返回的流都以这种方式表现良好。”这减轻了在执行之前在当前线程中进行的写操作不会被可能在不同线程上执行的管道中的内容看到的担忧... 当然,这不适用于“表现不良”的流... - undefined

1
  1. Terminal operations in Stream are blocking operations. If there is no parallel execution, the thread executing the terminal operation runs all operations in the pipeline.

Definition 1.1. Pipeline refers to a sequence of chained methods.

Definition 1.2. Intermediate operations can be located anywhere in the stream except at the end. They return a stream object and do not execute any operation in the pipeline.

Definition 1.3. Terminal operations can only be located at the end of the stream. They execute the pipeline and do not return a stream object, so no other Intermediate operations or Terminal operations can be added after them.

  1. From the first solution, we can conclude that the calling thread will execute the action method inside the forEach terminal operation on each element in the calling stream.
Java 8为我们引入了Spliterator接口。它具备Iterator的能力,同时还有一组操作来帮助并行执行和分割任务。在顺序执行中,当从原始流中调用forEach时,调用线程将调用Spliterator.forEachRemaining方法。
@Override
public void forEach(IntConsumer action) {
   if (!isParallel()) {
        adapt(sourceStageSpliterator()).forEachRemaining(action);
    }
    else {
        super.forEach(action);
    }
}

您可以在我的教程中了解更多关于Spliterator的内容:第6部分 - Spliterator

  1. 只要您不在流操作中的多个线程之间更改任何共享状态(这是被禁止的-很快就会解释),您在运行并行流时就不需要使用任何其他同步工具或算法。

像reduce这样的流操作使用accumulatorcombiner函数来执行并行流。根据定义,流库禁止突变。您应该避免它。

并发和并行编程中有很多定义。我将介绍一组对我们最有用的定义。

定义8.1. 并发编程是使用额外的同步算法解决任务的能力。

定义 8.2. 并行编程 是解决任务而不使用额外同步算法的能力。

您可以在我的教程中了解更多信息:第7部分 - 并行流


4
谢谢您的回答。请您提供您陈述的来源,特别是关于“执行终端操作的线程会运行管道中的所有操作”和“调用线程将执行action方法”的说法。代码示例并没有太大帮助,因为实现可能会在未来发生改变(即使我看不到为什么它应该改变)。 - Alex R
1
这个单一元素只是一个例子。假设我有一百万个这样的操作元素。将这些元素传递给其他线程,使用顺序流的forEach()方法是否是有效的行为? - Alex R
4
抱歉,我认为你是错误的。我认为,顺序流和并行流之间的区别在于管道的执行方式,而不是执行的位置(请参见流包概述)。“forEach”方法的文档清楚地说明:“...操作可以在库选择的任何线程中执行。” Brian Goetz在这个答案的最后一段明确表示,“在这里明确调用并行情况是教学上的”。 - Alex R
1
这就是我感到困惑的原因 :-) 他们谈论并行流,然后句子就结束了。Brian的帖子指出,这个句子的意图只是为了向不了解并行性的人澄清该方法的行为。他甚至说:“即使完全删除该句子,规范仍然非常清晰”。无论如何,感谢您的耐心!我认为我们应该在这里结束讨论。遗憾的是,我还不能使用聊天功能。 - Alex R
1
你的教程中引用的这里有一个错别字:应该是“concurrent”,而不是“cuncurrent”。 - M. Justin
显示剩余7条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接