为什么Java Stream生成器是无序的?

10

我试图使用Java Streams并行化一些工作。让我们考虑这个简单的例子:

Stream.generate(new Supplier<Integer>() {
        @Override
        public Integer get() {
            return generateNewInteger();
        }
    })
    .parallel()
    .forEachOrdered(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) {
            System.out.println(integer);
        }
    });
问题在于它不调用forEachOrdered方法,只有在使用forEach方法时才有效。我想问题的原因是Stream.generate内部创建了一个没有ORDERED特性的InfiniteSupplyingSpliterator
问题是为什么?似乎我们知道数据生成的顺序。第二个问题是如何在并行流中对生成的流元素进行forEachOrdered操作?

3
它是无序的,因为规范是这样规定的。它没有完成的原因是它是无限的,再加上实现细节的影响。 - Holger
2
使用 lambda 表达式可以更加简洁地书写这个内容。 - David Conrad
无关紧要,但您可以使用.forEachOrdered(System.out::println) - tobias_k
1个回答

11
最简单的答案是,Stream.generate 是无序的,因为它的规范如此规定。
这并不像实现在尝试按顺序处理项一样,事实上恰恰相反。一旦将操作定义为无序操作,实现就会尽可能从无序性中获得好处。如果您在无序操作中看到类似源顺序的东西,则可能没有办法从无序处理中获得好处或实现尚未利用所有机会。因为这可能会在未来的版本或替代实现中发生变化,所以如果操作已被指定为无序,则不能依赖顺序。
Stream.generate定义为无序的意图可能会在与有序的Stream.iterate进行比较时变得更清晰。传递给iterate的函数将接收其先前元素,因此元素之间存在先前-随后的关系,因此具有顺序性。当仅考虑函数签名时,传递给Stream.generate的供应商不会接收先前元素,换句话说,没有与前一个元素的关系。这适用于Stream.generate(() -> constant)Stream.generate(Type::new)等用例,但对于Stream.generate(instance::statefulOp)似乎不是预期的主要用例。如果操作是线程安全的,并且您可以接受流的无序性,则仍然可以工作。你的示例永远不会取得进展的原因是,forEachOrdered的实现实际上并没有考虑到无序性质,而是试图按照遇到的顺序处理分割后的块。也就是说,所有子任务都尝试缓冲它们的元素,以便在左边的子任务完成后将它们传递给操作。当然,缓冲和无限源不太搭配,特别是由于底层的InfiniteSupplyingSpliterator将分裂成自己就是无限的子任务。原则上,有一个最左边的任务可以直接向操作输入其元素,但该任务似乎在队列中某处等待被激活,这将永远不会发生,因为所有工作线程都已忙于处理其他无限子任务。如果你让它运行足够长的时间,整个操作最终将以OutOfMemoryError的形式崩溃。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接