在Java中如何创建有限生成的流?

54

在Java中,可以使用Stream.generate(supplier)轻松生成无限流。但是,我需要生成一个最终会结束的流。

比如说,我想要一个文件夹中所有文件的流。文件数量可能很大,因此我无法预先收集所有数据并从它们创建流(通过collection.stream())。我需要逐个生成序列。但是这个流显然会在某个时候结束,并且像collect()findAny()这样的终端操作符需要对其进行操作,因此Stream.generate(supplier)在这里不适用。

有没有什么合理简单的方法在Java中实现这一点,而无需自己实现整个Stream接口?

我可以想到一个简单的技巧-使用无限的Stream.generate(supplier)来做,当所有实际值被取走时提供null或抛出异常。但这将破坏标准流算子,我只能使用自己的算子,让它们知道这种行为。

澄清

评论中的人建议我使用takeWhile()运算符。这不是我的意思。如何更好地表达问题...我不是在问如何过滤(或限制)现有的流,而是在问如何创建(生成)流-动态地,无需预先加载所有元素,但流将具有有限的大小(事先未知)。

解决方案

我正在寻找的代码是

    Iterator it = myCustomIteratorThatGeneratesTheSequence();
    StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, Spliterator.DISTINCT), false);

我刚才查看了java.nio.file.Files,看一下list(path)方法是如何实现的。


1
你看过像IntStream.range这样的方法吗? - Javier Martín
1
openjdk 9 提供了 takeWhile() - Andrew Tobilko
让我们回到一个例子,其中一个流返回给定目录中的所有文件。您不能预先读取所有文件,然后使用collectionOfFiles.stream()创建流,因为可能有数百万个文件,这可能会耗尽内存。必须逐位读取文件列表,并逐渐将它们馈入流中,因为它们从另一端被消耗。就像Stream.generate()方法一样,只是该流不会无限制地生成。 - Jan X Marek
3
一种方法是使用这个答案https://dev59.com/12455IYBdhLWcg3wCPbh#17959135创建一个`Iterator<File>,然后使用这个答案https://dev59.com/-WAf5IYBdhLWcg3wlDUD#24511534将Iterator转换为Stream`。这样,迭代器只会在内存中保留少量文件,并且流将被惰性地评估。虽然这很麻烦。 - Paul Boddington
5
我强烈建议先检查Spliterator的逻辑是否更合适,然后再决定是否实现一个更复杂的Iterator并将其包装在一个Spliterator中。可以参考这个答案作为示例。 - Holger
显示剩余5条评论
4个回答

24
有没有一种合理简单的方法在Java中实现这个,而不需要自己实现整个Stream接口?
一个简单的.limit()保证它会终止。但这并不总是足够强大。
在Stream工厂方法之后,创建自定义流源的最简单方法是子类化java.util.Spliterators.AbstractSpliterator ,并将其传递给java.util.stream.StreamSupport.stream(Supplier>, int, boolean)
如果您打算使用并行流,请注意AbstractSpliterator仅产生次优分割。如果您对源具有更多控制,则完全实现Spliterator接口可能更好。
例如,以下片段将创建提供1,2,3 ...无限序列的Stream
在这个特定的例子中,你可以使用IntStream.range()
但是流显然会在某个时候结束,终端操作符(如collect()或findAny())需要对其进行处理。

findAny()这样的短路操作实际上可以在无限流上完成,只要存在任何匹配的元素。

Java 9引入Stream.iterate以生成一些简单情况的有限流。


0

尽管作者已经放弃了 takeWhile 选项,但我发现它对某些用例来说是足够的,并值得解释。

takeWhile 方法可用于任何流,并在提供给该方法的谓词返回 false 时终止流。导致 false 的对象不会附加到流中;只有导致 true 的对象会被传递到下游。

因此,生成有限流的一种方法可以是使用 Stream.generate 方法并返回一个值,该值通过由提供给 takeWhile 的谓词评估为 false 来标志流的结束。

这里有一个示例,生成数组的所有排列:

public static Stream<int[]> permutations(int[] original) {
    int dim = original.length;

    var permutation = original.clone();
    int[] controller = new int[dim];
    var low = new AtomicInteger(0);
    var up = new AtomicInteger(1);

    var permutationsStream = Stream.generate(() -> {
        while (up.get() < dim) {
            if (controller[up.get()] < up.get()) {
                low.set(up.get() % 2 * controller[up.get()]);

                var tmp = permutation[low.get()];
                permutation[low.get()] = permutation[up.get()];
                permutation[up.get()] = tmp;

                controller[up.get()]++;
                up.set(1);

                return permutation.clone();
            } else {
                controller[up.get()] = 0;
                up.incrementAndGet();
            }
        }

        return null;
    }).takeWhile(Objects::nonNull);

    return Stream.concat(
            Stream.ofNullable(original.clone()),
            permutationsStream
    );
}

在这个例子中,我使用了null值来表示流的结束。方法的调用者不会收到null值!
OP可以使用类似的策略,并将其与访问者模式相结合。
如果它是一个扁平目录,OP最好使用Stream.iterate,种子是要产生的文件的索引,Stream.limit是文件数量(可以在不浏览目录的情况下知道)。

0

这里有一个自定义且有限的流:

package org.tom.stream;
import java.util.*;
import java.util.function.*;
import java.util.stream.*;

public class GoldenStreams {
private static final String IDENTITY = "";

public static void main(String[] args) {
    Stream<String> stream = java.util.stream.StreamSupport.stream(new Spliterator<String>() {
        private static final int LIMIT = 25;
        private int integer = Integer.MAX_VALUE;
        {
            integer = 0;
        }
        @Override
        public int characteristics() {
            return Spliterator.DISTINCT;
        }
        @Override
        public long estimateSize() {
            return LIMIT-integer;
        }
        @Override
        public boolean tryAdvance(Consumer<? super String> arg0) {
            arg0.accept(IDENTITY+integer++);
            return integer < 25;
        }
        @Override
        public Spliterator<String> trySplit() {
            System.out.println("trySplit");
            return null;
        }}, false);
    List<String> peeks = new ArrayList<String>();
    List<String> reds = new ArrayList<String>();
    stream.peek(data->{
        peeks.add(data);
    }).filter(data-> {
        return Integer.parseInt(data)%2>0;
    }).peek(data ->{
        System.out.println("peekDeux:"+data);
    }).reduce(IDENTITY,(accumulation,input)->{
        reds.add(input);
        String concat = accumulation + ( accumulation.isEmpty() ? IDENTITY : ":") + input;
        System.out.println("reduce:"+concat);
        return concat;
    });
    System.out.println("Peeks:"+peeks.toString());
    System.out.println("Reduction:"+reds.toString());
}
}

0

使用Kotlin编写代码,从InputStream创建JsonNode流


    private fun InputStream.toJsonNodeStream(): Stream<JsonNode> {
        return StreamSupport.stream(
                Spliterators.spliteratorUnknownSize(this.toJsonNodeIterator(), Spliterator.ORDERED),
                false
        )
    }

    private fun InputStream.toJsonNodeIterator(): Iterator<JsonNode> {
        val jsonParser = objectMapper.factory.createParser(this)

        return object: Iterator<JsonNode> {

            override fun hasNext(): Boolean {
                var token = jsonParser.nextToken()
                while (token != null) {
                    if (token == JsonToken.START_OBJECT) {
                        return true
                    }
                    token = jsonParser.nextToken()
                }
                return false
            }

            override fun next(): JsonNode {
                return jsonParser.readValueAsTree()
            }
        }
    }


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接