如何在没有isFinite()和isOrdered()方法的情况下安全地消耗Java Streams?

23

问题是Java方法是否应该返回 Collections还是Streams,Brian Goetz回答说,即使对于有限序列,通常应该首选Streams。

但是在我看来,当前从其他地方获取的Streams上许多操作无法安全执行,并且防御性代码保护不可能,因为Streams不会透露它们是否无限或无序。

如果并行处理对我想要执行的Stream操作造成问题,我可以调用isParallel()进行检查,或者使用sequential确保计算是串行的(如果我记得的话)。

但是,如果有序性或有限性(大小)与程序的安全性相关,则无法编写保护措施。

假设我使用实现此虚构接口的库:

public interface CoordinateServer {
    public Stream<Integer> coordinates();
    // example implementations:
    // finite, ordered, sequential
    // IntStream.range(0, 100).boxed()
    // final AtomicInteger atomic = new AtomicInteger();
    
    // // infinite, unordered, sequential
    // Stream.generate(() -> atomic2.incrementAndGet()) 

    // infinite, unordered, parallel
    // Stream.generate(() -> atomic2.incrementAndGet()).parallel()
    
    // finite, ordered, sequential, should-be-closed
    // Files.lines(Path.path("coordinates.txt")).map(Integer::parseInt)
}

那么我可以在这个流上进行哪些操作,以编写正确的算法?

如果我希望将元素作为副作用写入文件,则似乎需要关注流是否并行:

// if stream is parallel, which order will be written to file?
coordinates().peek(i -> {writeToFile(i)}).count();
// how should I remember to always add sequential() in  such cases?

而且如果它是并行的,那么基于什么线程池使其并行?

如果我想对流进行排序(或其他非短路操作),我需要注意它是否为无限流:

coordinates().sorted().limit(1000).collect(toList()); // will this terminate?
coordinates().allMatch(x -> x > 0); // will this terminate?

在排序之前,我可以设置一个限制,但是如果我期望一个未知大小的有限流,应该设置什么神奇的数字呢?

最后,也许我想要并行计算以节省时间,然后收集结果:

// will result list maintain the same order as sequential?
coordinates().map(i -> complexLookup(i)).parallel().collect(toList());

但如果流没有被排序(在该库的版本中),那么由于并行处理,结果可能会变得混乱。但我该如何防范这种情况,除了不使用并行(这将破坏性能目的)之外?

集合明确了它们是有限的还是无限的,是否有顺序,并且它们不携带处理模式或线程池等信息。这些似乎是API的有价值属性。

此外,流有时需要关闭,但大多数情况下并非如此。如果我从一个方法中消费流(或者从一个方法参数中消费),我通常应该调用close吗?

此外,流可能已经被消费,因此最好能够优雅地处理这种情况,所以最好检查流是否已经被消费

我希望有一些代码片段可以用来验证关于流的假设,在处理流之前,像:

Stream<X> stream = fooLibrary.getStream();
Stream<X> safeStream = StreamPreconditions(
    stream, 
    /*maxThreshold or elements before IllegalArgumentException*/
    10_000,
    /* fail with IllegalArgumentException if not ordered */
    true
    )


1
我猜很多情况都是“这要看具体情况”。如果不算太宽泛的话,我会等待像Holger这样的人来回答。 - Naman
也许这篇文章可以回答你的一些问题:https://www.baeldung.com/java-stream-ordering - JavaMan
正如Brian在你发布的答案中所说,“必须返回集合的一种情况是存在强一致性要求”。要求它是有限的就是其中之一。 - daniu
@daniu:但他明确建议使用流来处理有限数据,这意味着对他而言,有限性并不是强一致性要求。 - tkruse
除非您假定方法的作者知道方法的所有用途(当前和未来),因此知道对客户端而言,有限性不是一致性要求。 - tkruse
显示剩余2条评论
1个回答

2
经过一些试验和这里的观察,据我所见,没有确切的方法可以知道一个流是有限的还是无限的。
更甚者,有时候甚至只能在运行时确定(比如在Java 11中:IntStream.generate(() -> 1).takeWhile(x -> externalCondition(x)))。
你可以做的是:
您可以通过以下几种方式确定它是否有限(请注意,这些方法返回false并不意味着它是无限的,只是可能如此):
1. `stream.spliterator().getExactSizeIfKnown()` - 如果已知确切大小,则为有限,否则将返回-1。 2. `stream.spliterator().hasCharacteristics(Spliterator.SIZED)` - 如果是`SIZED`,则返回true。
您可以采取措施防范最坏情况(取决于您的情况)。
1. `stream.sequential()/stream.parallel()` - 显式设置首选消费类型。 2. 对于潜在的无限流,请在每种情况下假设最坏情况。
例如,假设您想要收听推文流,直到找到一个由Venkat发布的推文 - 这是一个潜在的无限操作,但您希望等待直到找到这样的推文。因此,在这种情况下,只需使用`stream.filter(tweet -> isByVenkat(tweet)).findAny()` - 它会迭代直到找到这样的推文(或永远)。
另一种场景,也可能是更常见的场景,是想对所有元素执行某些操作,或仅尝试一定次数(类似于超时)。为此,我建议在调用您的操作(`collect`或`allMatch`等)之前始终调用`stream.limit(x)`,其中`x`是您愿意容忍的尝试次数。
在这一切之后,我只想提到返回流通常不是一个好主意,除非有巨大的好处,我会尽量避免这样做。

.splititerator()是一个方法,不是公共字段。另外,你可以复制SIZED的检查来判断ORDERED吗?我认为在流处理时可以有一个元素计数器,即使对于可能无限的流,如果发出的元素超过了我最大预期的数量,它也可以抛出异常(当然会影响性能)。否则,到目前为止,回答很好。 - tkruse
spliterator - 正确的。ordered - 问题在于它只能在有限的情况下被排序,否则它将永远需要时间(例如,Stream.generate(random::nextInt).sorted()将会引起intellij警告),因此检查是否为有序有点多余。与其保持计数器并自行增加,为什么不使用 limit(x) 作为最大值呢? - orirab
限制并不告诉你是否还有更多。例如,在非常长且可能是无限的流上调用Max,抛出异常比返回错误的数字更安全。 - tkruse
我不太确定 - 这非常取决于您的用例,但我理解您的观点。 - orirab
如果这是一个合适的答案,您是否会考虑接受它? - orirab

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接