如何实现Java Stream?

43

我想实现一个 Stream<T>

我不想只使用 implements Stream<T>,因为我会不得不实现大量方法。

有没有办法避免这种情况?

更具体地说,我该如何对例如 t1t2t3 进行流操作:

class Foo<T> {
    T t1, t2, t3;

    Foo(T t1, T t2, T t3) {
        this.t1 = t1;
        this.t2 = t2;
        this.t3 = t3;
    }
}

11
Stream.of(t1, t2, t3) - Sotirios Delimanolis
5
Stream.builderStream.builder是Java 8中提供的一个用于构建Stream对象的工具类。这个类可以将多个元素添加到一个集合中,并根据这些元素创建一个新的Stream对象。使用Stream.builder的好处之一是可以在运行时动态地添加元素,而不需要在创建Stream对象之前就已经知道所有元素。 - fabian
5个回答

55
JDK中Stream的标准实现是内部类java.util.stream.ReferencePipeline,无法直接实例化。

相反,您可以使用java.util.stream.Stream.builder()java.util.stream.StreamSupport.stream(Spliterator<T>, boolean)和各种其他静态工厂方法来创建默认实现的实例。1, 2

使用Spliterator可能是最强大的方法,因为它允许您懒惰地提供对象,同时如果您的源可以分成多个块,则启用高效的并行处理。

此外,您还可以将流转换回分割器,将其包装在自定义分割器中,然后将其转换回流,如果您需要实现自己的有状态中间操作(例如由于标准API的缺陷),因为大多数可用的中间操作 不允许有状态。请参见此SO答案以获取示例。
原则上,您可以编写自己的流接口实现,但这将非常繁琐。

但是 Spliterator 不可关闭,而我的数据源却需要 :( - Mooing Duck

21

如果你想创建自己的流是因为你需要自定义 close() 逻辑,最简单的解决方案是从迭代器创建一个流,并调用 onClose(Runnable)。例如,通过Jackson从Reader流式传输:

MappingIterator<?> values = objectMapper.reader(type).readValues(reader);
return StreamSupport
        .stream(Spliterators.spliteratorUnknownSize(values, Spliterator.ORDERED), false)
        .onClose(() -> {
            try {
                reader.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });

17

通常情况下,您不需要编写自己的流类,而是可以通过现有的方法创建流。例如,以下是如何创建一个值为1到100的流:

  AtomicInteger n = new AtomicInteger(0);
  Stream<Integer> stream = Stream.generate(() -> n.incrementAndGet()).limit(100);

所以在这里我们创建了一个整数的无限流:1、2、3、……然后我们使用limit(100)来获取包含100个元素的流。

为了清晰起见,如果您想要一个(按固定间隔)生成整数的流,应该使用IntStream.range()。这只是一个示例,展示了如何使用Stream.generate()定义流,这使您具有更大的灵活性,因为它允许您使用任意逻辑来确定流的元素。


2
在这种情况下,IntStream.range(0,100)会更高效。 - the8472
1
这可以简写为Stream.generate(n::incrementAndGet).limit(100) - Nathan Villaescusa
如果你想让流从0开始,请使用n::getAndIncrement - Nathan Villaescusa

11

为了完整起见,由于我没有在SO上直接找到这个答案:如果您想将现有的Iterator转换为Stream(例如,因为您想连续生成元素),请使用以下方法:

StreamSupport.stream(
    Spliterators.spliterator(myIterator, /* initial size*/ 0L, Spliterator.NONNULL), 
    /* not parallel */ false);

我发现这有点难找,因为你需要了解 StreamSupport、Spliterators 和 Spliterator。

这很有用,谢谢!请注意,您所称之为“初始大小”的参数实际上是流的估计大小。我不太确定它的用途,但它似乎在流是否可并行化方面起着作用,我认为在此处传递0会完全排除并发性。而Spliterators.spliteratorUnknownSize(myIterator, Spliterator.NONNULL)则会使用Long.MAX_VALUE作为估计值。 - sqweek
正如文档所述:“只有在流管道的终端操作开始后,才会遍历、拆分或查询spliterator的预计大小”。因此,如果spliterator的底层迭代器包含的元素数量超过了size值,它们将不会被获取/收集。 - Cromax

11

其他人已经回答了如何提供通用的Stream实现。针对您的具体需求,只需要按照以下步骤操作:

class Foo<T> {

    T t1, t2, t3;

    Foo(T t1, T t2, T t3) {
        this.t1 = t1;
        this.t2 = t2;
        this.t3 = t3;
    }

    Stream<T> stream() {
        return Stream.of(t1, t2, t3);
    }
}

1
这个答案既简洁又符合惯用语(至少它模仿了标准库中的流方法)。所以我认为它应该排名更高。 - bobismijnnaam

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接