在执行其他操作之前找到流大小

13
在我的程序中,我会反复地收集Java 8 ,以将一个对象集合缩减为单个对象。该集合的大小在执行过程中可能会有很大的变化:从3个对象到几百个对象不等。
public void findInterestingFoo(Stream<Foo> foos) {
    internalState.update(foos.collect(customCollector()));
}

在优化代码和寻找瓶颈的过程中,我将流parallel。当时这样做是有效的,因为集合都相当大。后来,在更改程序的其他部分和参数后,集合变得更小了。我意识到,让流并行更有效。这是有道理的:为4个对象分配工作到多个线程的开销太大了。然而,对于数百个对象,这是值得的。
如果我只能使大型流并行,那将非常方便:
public void findInterestingFoo(Stream<Foo> foos) {
    if (isSmall(foos)) {
        internalState.update(foos.collect(customCollector()));
    } else {
        internalState.update(foos.parallel().collect(customCollector()));
    }
}

当流是从数组, 集合手动创建时,可以手动执行此操作。也就是说,我们知道哪些元素进入流中,因此可以跟踪这些元素。然而,我有兴趣以通用的方式解决这个问题,以便无论传递给findInterestingFoo的流是什么类型,都可以适当地处理并尽可能高效地处理。
count()这样的东西可能会有所帮助,但它会在我collect之前终止流。
我很清楚流被设计成没有固定大小,特别是:
可能是无界的。虽然集合具有有限大小,但流不需要。短路操作,如limit(n)findFirst(),可以使对无限流的计算在有限时间内完成。 — java.util.stream包描述 尽管如此,我想知道是否有任何方法可以在执行任何操作之前确定流中有多少元素。流真的不知道它是从一个有限的集合创建的吗?

1
“一个流是否真的不知道它是由有限集合创建的?” - 你谈论的是一个接口,所以答案是:不,一个(通用的)Stream并不知道它是否来自(无)限源,因为它既没有定义isFinite()方法也没有定义size()方法。 - Turing85
在你的情况下,性能有多重要?对我来说,如果只涉及几百个元素,无论你是否并行处理,差别都微乎其微。 - Ole V.V.
@Turing85 当然可以。但据我所知,没有 SizedStream 接口或类似的东西。难道有吗?据我所知,由 Arrays.stream 创建的流只是 Stream - Just a student
1
如果这非常重要,可以添加一个参数并让调用者传递提示,以表明流是大的还是小的(或只是预计是大的或小的)。 - Ole V.V.
@OleV.V. 对于我的情况来说,那肯定可行。"这在Java 8的Stream接口中不可能"是我问题的完全可以接受的答案,我只是希望存在更好的方法 :-) - Just a student
显示剩余2条评论
1个回答

17

理论上,您可以像这样做:

public void findInterestingFoo(Stream<Foo> foos) {
    Spliterator<Foo> sp = foos.spliterator();
    long size = sp.getExactSizeIfKnown();// returns -1 if not known
          // or sp.estimateSize(); // Long.MAX_VALUE means "unknown"
    internalState.update(
        StreamSupport.stream(sp, size > PARALLEL_THRESHOLD)
                     .collect(customCollector()));
}

spliterator() 是一个终止操作,消耗输入流,但您可以将 Spliterator 传递给 StreamSupport.stream 来构造完全具有相同属性的流。第二个参数已经说明了流是否应该是并行的。

从理论上说。

实际上,当前的流实现将返回不同的 Spliterator 实现,具体取决于流是否为并行。这意味着,如果在调用 spliterator() 之前原始流不是并行的,则将重新创建流作为并行流可能会导致无法进行并行处理的流。

然而,如果没有中间操作,例如直接传入从集合或数组创建的 Stream,它确实运行良好。

在调用 spliterator() 之前调用 parallel() 以获取可以并行运行但仍然可以按顺序运行的流,在许多情况下可行。但是,如果输入流中存在有状态的中间操作,例如 sorted(),它们可能被固定为并行运行,即使您按顺序进行 collect(反之亦然)。


另一个问题属于基本性质。元素数量实际上并不能说明是否会有并行处理的好处。这取决于每个元素的工作量,其不仅取决于终止的 collect 操作,还取决于进入方法之前已经链接到流的操作。即使您得出结论,收集器的工作量已经足够高,值得并行处理,但可能会发现传入的流具有类似于 skiplimit 或(在有序流上)distinct 的操作,这些操作通常在并行情况下运行更差,并且需要完全不同的阈值。

一个更简单的解决方案是让调用者决定,因为调用者知道流的大小和性质。您甚至不需要在方法签名中添加选项,因为调用者可以通过在将流传递给您的方法之前调用 parallel()sequential() 来做出决策,您可以通过简单地不更改模式来尊重这一点。


1
有趣的问题和答案(我错过了一天,感觉像永远)。如果可以的话,我有一个后续问题。如果调用API不知道或无法告诉何时使用并行处理,但是我知道。是否有一种方法可以告诉该流是否实际上正在使用skip或distinct,好吧,我想Distinct标志可以查询。但是例如skip或limit呢?有时我会遇到这些情况,如果我能查询这些标志,我就可以确定并行处理与否。如果有任何拼写错误,请原谅,因为我在手机上。 - Eugene
2
@Eugene,“DISTINCT”标志并不能告诉您链中是否存在昂贵的“distinct()”操作。流可能是自然不同的,因为源是一个“Set”。即使“IntStream.range(…)”产生不同的值并具有完美的并行性能。简而言之,没有办法确定。此外,还有其他源属性可能会影响并行性能,例如,我们可能会在文件的行上进行流处理。通常,假设并行处理效果不佳的主要原因在于调用方。另一端只知道额外的障碍。 - Holger
非常感谢您的出色回答!我稍作修改,如有不妥之处请随时更改。提醒一下,我打算给您的回答颁发奖励,因为它不仅回答了我的问题,还教会了我一些额外的知识。 - Just a student

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接