流如何停止?

11

当我使用Stream.generate创建自己的无限流时,我想知道在标准库中的流是如何停止的...

例如,当你有一个记录列表:

List<Record> records = getListWithRecords();
records.stream().forEach(/* do something */);

流不会无限制地运行下去,当列表中的所有项目都被遍历完时,它将停止。但是这是如何实现的呢?Files.lines(path)创建的流也适用相同的功能(来源:http://www.mkyong.com/java8/java-8-stream-read-a-file-line-by-line/)。

第二个问题是,使用Stream.generate创建的流如何以相同的方式停止?

2个回答

18

使用Stream.generate无法创建有限流。

实现流的标准方法是通过实现Spliterator,有时候使用迭代器。在任何情况下,实现都有一种报告结束的方式,例如当Spliterator.tryAdvance返回false或其forEachRemaining方法返回时,或者在Iterator源的情况下,当hasNext() 返回false时。

Spliterator甚至可以在处理开始之前报告预期的元素数量。

通过Stream接口中的一个工厂方法创建的流(例如Stream.generate)也可以实现为Spliterator或使用流实现的内部功能,但无论如何实现,您都无法接触此实现以更改其行为,因此使这样的流变得有限的唯一方法是将limit操作链接到流上。

如果要创建一个不由数组或集合支持且非空的有限流,并且没有任何现有的流源适用,则必须实现您自己的Spliterator创建一个流。如上所述,您可以使用现有方法将Iterator转换为Spliterator,但应抵制使用Iterator的诱惑,因为它很熟悉。实现Spliterator并不难:

/** like {@code Stream.generate}, but with an intrinsic limit */
static <T> Stream<T> generate(Supplier<T> s, long count) {
    return StreamSupport.stream(
               new Spliterators.AbstractSpliterator<T>(count, Spliterator.SIZED) {
        long remaining=count;

        public boolean tryAdvance(Consumer<? super T> action) {
            if(remaining<=0) return false;
            remaining--;
            action.accept(s.get());
            return true;
        }
    }, false);
}

从这个起点开始,您可以添加 Spliterator 接口的 default 方法的覆盖,权衡开发成本和潜在的性能改进,例如:

static <T> Stream<T> generate(Supplier<T> s, long count) {
    return StreamSupport.stream(
               new Spliterators.AbstractSpliterator<T>(count, Spliterator.SIZED) {
        long remaining=count;

        public boolean tryAdvance(Consumer<? super T> action) {
            if(remaining<=0) return false;
            remaining--;
            action.accept(s.get());
            return true;
        }

        /** May improve the performance of most non-short-circuiting operations */
        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            long toGo=remaining;
            remaining=0;
            for(; toGo>0; toGo--) action.accept(s.get());
        }
    }, false);
}

1
为什么要避免使用迭代器来定义Spliterator?例如,我刚刚看到BufferedReader.lines()使用这种方法来创建有限流。 - Juru
6
BufferedReader.lines()是一个很好的例子。看看next()方法和hasNext()方法的实现,以及它们在调用之间如何保持状态。相比之下,Spliterator更加直观,只需要一个方法:tryAdvance(Consumer<? super String> c) { String line=readLine(); if(line==null) return false; c.accept(line); return true; }就可以了。实现更简单(添加异常处理后仍然只有一半的代码大小),无需包装器... - Holger
1
@WillD:从使用Stream.parallel()的角度来看,它是安全的,因为它将正确地工作。 Spliterator本身不需要线程安全,因为流不会同时访问它。通过添加专用的trySplit实现,可以提高并行性能,但请注意,只需使用IntStream.range(0, count).mapToObj(i -> supplier.get())即可实现相同的结果。此答案的代码更像是解决更专业任务的模板。 - Holger

0
我已经为此创建了一个通用的解决方案。
public class GuardedSpliterator<T> implements Spliterator<T> {

  final Supplier<? extends T> generator;

  final Predicate<T> termination;

  final boolean inclusive;

  public GuardedSpliterator(Supplier<? extends T> generator, Predicate<T> termination, boolean inclusive) {
    this.generator = generator;
    this.termination = termination;
    this.inclusive = inclusive;
  }

  @Override
  public boolean tryAdvance(Consumer<? super T> action) {
    T next = generator.get(); 
    boolean end = termination.test(next);
    if (inclusive || !end) {
      action.accept(next);
    }
    return !end;
  }

  @Override
  public Spliterator<T> trySplit() {
    throw new UnsupportedOperationException("Not supported yet.");
  }

  @Override
  public long estimateSize() {
    throw new UnsupportedOperationException("Not supported yet.");
  }

  @Override
  public int characteristics() {
    return Spliterator.ORDERED;
  }

}

使用非常简单:

GuardedSpliterator<Integer> source = new GuardedSpliterator<>(
    ()  -> rnd.nextInt(),
    (i) -> i > 10,
    true
);

Stream<Integer> ints = StreamSupport.stream(source, false);

ints.forEach(i -> System.out.println(i));    

1
没有理由通过抛出“UnsupportedOperationException”来实现接口方法,因为有明确定义的回退值,例如,“estimateSize()”应该返回“Long.MAX_VALUE”表示“未知大小”,而“trySplit()”在不支持分割时应返回“null”。但是,如果您扩展“Spliterators.AbstractSpliterator”,甚至可以获得分割支持,而无需实现这些方法。 - Holger

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接