Java 8中是否可以复制流?

82

有时候我想在一个流上执行一系列操作,然后用其他操作以两种不同的方式处理结果流。

我能否无需重复指定共同的初始操作而实现这个目的?

例如,我希望存在一个dup()方法,如下所示:

Stream [] desired_streams = IntStream.range(1, 100).filter(n -> n % 2 == 0).dup();
Stream stream14 = desired_streams[0].filter(n -> n % 7 == 0); // multiples of 14
Stream stream10 = desired_streams[1].filter(n -> n % 5 == 0); // multiples of 10

2
我确实意识到流惰性计算不会有任何性能提升;我只是希望避免重复代码。 - necromancer
1
定位代码中的变化部分并将其提取到变量中。然后创建一个方法来提取可重用的代码片段并将变量应用于其中。 - d1e
@Elazar 这样做既不会高效利用内存,也无法处理无限流! - necromancer
2
没有更多的信息,无法复制一般的无限流。 - Elazar
@necromancer 我试了一下。 - Elazar
显示剩余3条评论
11个回答

65

以这种方式无法复制流。然而,您可以通过将公共部分移入方法或 lambda 表达式来避免代码重复。

Supplier<IntStream> supplier = () ->
    IntStream.range(1, 100).filter(n -> n % 2 == 0);
supplier.get().filter(...);
supplier.get().filter(...);

3
我正在考虑将采纳答案更改为Elazar的答案,并将您的答案链接作为第二个解决方案的一个很好的示例,以及针对我在问题中使用的特定例子的解决方案。希望这样可以。谢谢! - necromancer
12
谢谢您的提问。请随意更改所接受的答案。 - nosid

46

一般情况下不可能。

如果要复制输入流或输入迭代器,有两个选择:

A. 将所有内容保留在集合中,例如List<>

假设您将一个流复制到两个流s1s2中。 如果您已经在s1中推进了n1个元素并且在s2中推进了n2个元素,则必须在内存中保留|n2-n1 | 个元素才能保持同步。 如果您的流是无限的,则可能不存在所需存储的上限。

查看Python的tee()以了解需要什么:

这个迭代工具可能需要大量的辅助存储(取决于需要存储多少临时数据)。通常,如果一个迭代器在另一个迭代器开始之前使用了大部分或全部数据,则使用list()而不是tee()更快。

B. 在可能的情况下:复制创建元素的生成器的状态

为使此选项有效,您可能需要访问流的内部工作方式。换句话说,生成器 - 创建元素的部分 - 应首先支持复制。 [OP:请参阅此绝妙答案,以了解如何针对问题中的示例执行此操作]

它不适用于来自用户的输入,因为您将不得不复制整个“外部世界”的状态。 Java的Stream不支持复制,因为它被设计为尽可能通用; 例如,用于处理文件、网络、键盘、传感器、随机性等[OP:另一个例子是按需读取温度传感器的流。它不能复制,而不存储读数的副本]

不仅仅在Java中是这种情况;这是一个普遍的规律。你可以看到,在C++中,std::istream 只支持移动语义,而不支持复制语义(“复制构造函数(已删除)”),出于这个原因(和其他原因)。


2
+1 个了不起的答案;很可能会接受并链接到当前已被接受的答案作为 "B 点" 的具体例子。 - necromancer
3
一个阻塞队列可能是解决有限存储问题的方法,其中第一个流的读取器将被阻塞,直到第二个流被消耗。 当然,并非总是适用,但对于某些使用情况,特别是具有大缓冲区的情况,可能会起作用。 - necromancer
1
请注意,尽管我不认为这经常实用,但您可能能够压缩n2-n1个元素。 - Elazar
1
当我遇到这种问题时(由于我相对较新于Java.streams,所以并不经常遇到!),我的第一反应是选择选项A。然而,我总觉得这样做不太妥当,好像我们正在走出流的世界,然后再次回到其中...只是为了复制。
认为我更喜欢选项B。我在我的项目中使用它,看起来它正在按预期工作。
表面上看,处理可能需要重复,我想这更多是Streams API的作者的问题?
- Bill Naylor

9

如果您正在缓冲元素,并且已经在一个副本中使用了这些元素,但在另一个副本中尚未使用,则可能会出现此情况。

我们在jOOλ中实现了流的duplicate()方法,这是一个开源库,我们创建它来改进jOOQ的集成测试。基本上,您只需编写:

Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq(
    IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed()
).duplicate();

(注意:目前我们需要将流装入一个箱子中,因为我们还没有实现IntSeq)

内部有一个LinkedList缓冲区,存储了从一个流中消耗但未从另一个流中消耗的所有值。如果你的两个流以大致相同的速率被消耗,那么这可能是最有效的。

算法的工作如下:

static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
    final LinkedList<T> gap = new LinkedList<>();
    final Iterator<T> it = stream.iterator();

    @SuppressWarnings("unchecked")
    final Iterator<T>[] ahead = new Iterator[] { null };

    class Duplicate implements Iterator<T> {
        @Override
        public boolean hasNext() {
            if (ahead[0] == null || ahead[0] == this)
                return it.hasNext();

            return !gap.isEmpty();
        }

        @Override
        public T next() {
            if (ahead[0] == null)
                ahead[0] = this;

            if (ahead[0] == this) {
                T value = it.next();
                gap.offer(value);
                return value;
            }

            return gap.poll();
        }
    }

    return tuple(seq(new Duplicate()), seq(new Duplicate()));
}

这里有更多的源代码

实际上,使用 jOOλ,您可以编写完整的一行代码,如下所示:

Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq(
    IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed()
).duplicate()
 .map1(s -> s.filter(n -> n % 7 == 0))
 .map2(s -> s.filter(n -> n % 5 == 0));

// This will yield 14, 28, 42, 56...
desired_streams.v1.forEach(System.out::println)

// This will yield 10, 20, 30, 40...
desired_streams.v2.forEach(System.out::println);

4
谢谢,但目前被接受的答案确实表明:“如果在s1中有n1个高级元素,在s2中有n2个元素,你必须保持|n2 - n1|个元素在内存中以跟上节奏。如果你的流是无限的,那么所需存储空间将没有上限。” - necromancer

7

从Java 12开始,我们可以使用Collectors::teeing将主要流水线的元素传递给2个或更多下游收集器。

以您的示例为基础,我们可以执行以下操作:

@Test
void shouldProcessStreamElementsInTwoSeparateDownstreams() {
    class Result {
        List<Integer> multiplesOf7;
        List<Integer> multiplesOf5;

        Result(List<Integer> multiplesOf7, List<Integer> multiplesOf5) {
            this.multiplesOf7 = multiplesOf7;
            this.multiplesOf5 = multiplesOf5;
        }
    }

    var result = IntStream.range(1, 100)
            .filter(n -> n % 2 == 0)
            .boxed()
            .collect(Collectors.teeing(
                    Collectors.filtering(n -> n % 7 == 0, Collectors.toList()),
                    Collectors.filtering(n -> n % 5 == 0, Collectors.toList()),
                    Result::new
            ));

    assertTrue(result.multiplesOf7.stream().allMatch(n -> n % 7 == 0));
    assertTrue(result.multiplesOf5.stream().allMatch( n -> n % 5 == 0));
}

还有许多其他的收集器可以做其他的事情,例如在downstream中使用Collectors::mapping,您可以从同一源获得两个不同的对象/类型,如本文所示。


5

您还可以将流生成移动到单独的方法/函数中,该方法/函数返回此流并调用它两次。


4

有两种方法:

  • 将初始化放在一个方法中,然后再次调用该方法。

这种做法的优点是明确你正在做什么,并且适用于无限流。

  • 收集流并重新生成它。

在你的例子中:

final int[] arr = IntStream.range(1, 100).filter(n -> n % 2 == 0).toArray();

那么

final IntStream s = IntStream.of(arr);

1
谢谢,我意识到有一个更简单的答案(请看我的回答);收集流并不是非常节省内存,并且对于无限流根本行不通。 - necromancer
1
你的回答没有涉及到对流进行处理。从你的问题中我理解你想要将单个流收集到一个Map中并对其进行sum()操作。你只是在谈论设置管道。 - Boris the Spider

3

更新:这个方法不起作用。请看下面原始答案后的解释。

我真是太傻了。我所需要做的就是:

Stream desired_stream = IntStream.range(1, 100).filter(n -> n % 2 == 0);
Stream stream14 = desired_stream.filter(n -> n % 7 == 0); // multiples of 14
Stream stream10 = desired_stream.filter(n -> n % 5 == 0); // multiples of 10

为什么这样做不起作用:

如果你编写代码并尝试收集两个流,第一个流将会被很好地收集,但尝试流式处理第二个流将抛出异常:java.lang.IllegalStateException: stream has already been operated upon or closed

更具体地说,流是有状态的对象(顺便说一句,它们不能被重置或倒回)。你可以把它们看作迭代器,而迭代器又像指针。因此,stream14stream10可以被视为引用同一个指针。将第一个流完全消耗掉将导致指针“越过了末尾”。尝试消耗第二个流就像尝试访问已经“越过末尾”的指针,这自然是非法操作。

正如接受的答案所示,创建流的代码必须执行两次,但它可以被分隔成一个Supplier lambda或类似的结构。

完整测试代码:保存到Foo.java中,然后执行javac Foo.java,再执行java Foo

import java.util.stream.IntStream;

public class Foo {
  public static void main (String [] args) {
    IntStream s = IntStream.range(0, 100).filter(n -> n % 2 == 0);
    IntStream s1 = s.filter(n -> n % 5 == 0);
    s1.forEach(n -> System.out.println(n));
    IntStream s2 = s.filter(n -> n % 7 == 0);
    s2.forEach(n -> System.out.println(n));
  }
}

输出:

$ javac Foo.java
$ java Foo
0
10
20
30
40
50
60
70
80
90
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.<init>(AbstractPipeline.java:203)
    at java.util.stream.IntPipeline.<init>(IntPipeline.java:91)
    at java.util.stream.IntPipeline$StatelessOp.<init>(IntPipeline.java:592)
    at java.util.stream.IntPipeline$9.<init>(IntPipeline.java:332)
    at java.util.stream.IntPipeline.filter(IntPipeline.java:331)
    at Foo.main(Foo.java:8)

不可变状态,好的,给你。;D - d1e
1
不要直接删除,你可以添加一些解释为什么这种方法行不通。这样其他人就可以从中学习。 - mschenk74
@mschenk74 给你 :) - necromancer

1
我认为使用空流与Concat可能符合您的需求。尝试像这样做: ```

我认为使用空流与Concat可能符合您的需求。尝试像这样做:

```
Stream<Integer> concat = Stream.concat(Stream.of(1, 2), Stream.empty());

1

直接回答是:是的

虽然没有专门支持此功能,但可以实现它。我看到可能的方法有:
a. 复制整个流数据,然后根据其创建流副本 -> RAM消耗可能会是一个障碍
b. 读取流并将其每个元素中继到副本中 -> 我将在下面详细说明这种方法

概念

让我们想象一下b.解决方案:
<T> List<Stream<T>> copyStream(int copiesCount, Stream<T> originalStream)
允许创建copiesCountoriginalStream 的副本。

为了理解这个解决方案,必须理解流和可能通过它流动的数据元素之间的区别:例如,苹果、胡萝卜和土豆将是数据元素,而它们流动到达某个目的地的管道将是流。复制一个Stream就像创建更多的管道:然后必须将原始管道(即originalStream)连接到其他管道(也称为streamCopies);虽然在现实世界中无法将苹果对象从一个管道传递到更多管道(即streamCopies),但在编程中这是可能的:只需将包含苹果对象引用的变量传递给流副本。 实现细节 Java实现的Stream对解决方案的形状有很大影响。第一个影响是与数据元素通过流(又称为管道)时发生的情况有关:要实际读取(和处理)Stream中的元素,必须使用终端方法,例如forEach。在我们的情况下,必须调用originalStream.forEach,以便读取每个元素并传递给streamCopies(即下游管道);这必须在copyStream()方法返回之前发生,这是不好的,因为forEach将阻塞,直到所有originalStream元素都被消耗完。为了解决这个问题,copyStream()的实现将在一个线程中调用originalStream.forEach。消耗originalStream元素意味着将它们传递给下游管道(即streamCopies);由于没有缓存,必须确保每个originalStream元素在进入下一个元素之前传递给每个streamCopies。这意味着所有的streamCopies必须同时消费:如果某个streamCopies没有消费,它将阻塞所有其他streamCopies,因为originalStream将停止传递到下游管道,直到每个人都消费了当前元素(也就是说,它不会为后来的streamCopies消费者缓存任何内容)。但是,在Java中消耗一个Stream意味着在其上调用终端操作(例如forEach),这会阻塞执行,直到整个流被消耗完;因为我们需要所有的streamCopies并行消耗,所以必须为每个线程创建一个独立的线程!嗯,作为一个杂项事实,事实上可以在当前(主)线程上消耗其中一个streamCopies。总结一下,解决方案的使用如下所示:
List<Stream<?>> streamCopies = copyStream(copiesCount, originalStream);`  
// start a thread for each `streamCopies` into which consume the corresponding 
// stream copy (one of them could be consumed on the current thread though)
// optionally join the consuming threads
// continue your whatever business logic you have

最终考虑

上述一些限制可以被规避:

复制过程是破坏性的,即在调用copyStream()后,originalStream将无法使用,因为它将处于待消耗状态。如果确实想要使用它,可以创建一个额外的副本,可能在当前(主)线程上进行消费(但仅在启动所有其他副本的消费之后)。
streamCopies必须消耗所有接收到的originalStream元素,否则,如果有一个停止了,其他人也会阻塞(请再次阅读“实现细节”部分以了解原因)。这意味着每个streamCopies元素的消耗都必须发生在try ... catch中,以确保没有失败(也称为处理停止)。实际的生产实现事实上会通过包装每个Stream副本来规避这种情况,其中包含一些覆盖close()方法的内容,以从originalStream到streamCopies传输逻辑中删除失败的流副本(也就是丢弃用于在originalStream线程和originalStream线程之间通信的底层blockingQueue-请参见下面的实现)。这意味着客户端将被强制关闭Stream副本,但这并不是很常见,例如查看Spring的JDBCTemplate.queryForStream()结果具有相同的要求。
与先前指出的一样,每个streamCopies终端操作必须在单独的线程中执行-没有解决方法。

代码

以下是实现 b. 解决方案的代码和检查其正确性的测试。

@Test
void streamCopyTest() throws ExecutionException, InterruptedException {
    // streamCopies are valid/normal Stream 
    // instances (e.g. it is allowed to be infinite)
    List<Stream<String>> streamCopies = copyStream(3, Stream.of("a", "b", "c", "d"));
    // The 3 copies relay on the original stream which can’t be
    // consumed more than once! Consuming the copies one by one
    // in the same thread isn’t possible because 1st consumed 
    // copy would leave nothing to consume for the others, 
    // so they must be consumed in parallel.
    ExecutorService executorService = Executors.newCachedThreadPool();
    CompletableFuture<?>[] futures =
            streamCopies.stream().map(stream -> CompletableFuture.runAsync(() -> {
                // the same consumption logic for all streamCopies is 
                // used here because this is just an example; the 
                // actual consumption logic could be distinct (and anything)
                String outcome = stream.collect(Collectors.joining(", "));
                // check the thread name in the message to differentiate the outcome
                log.info("\n{}", outcome);
            }, executorService)).toArray(CompletableFuture[]::new);
    CompletableFuture.allOf(futures).get();
    executorService.shutdown();
}

@RequiredArgsConstructor
@Slf4j
public class StreamCopiesFactory {

    /**
     * The amount of elements to be stored in the blockingQueue used 
     * to transfer elements from the original stream to its copies. 
     * This is very different to the cache use for the a. solution:
     * here is about the transfer between original stream and its 
     * copies instead of the entire original stream data-copy.
     * Change or make this configurable.
     */
    private static final int cacheSize = 1;

    /**
     * Each of these stream copies must execute (their terminal operation)
     * on a distinct thread! One of them could actually execute on the 
     * main thread, but only after all the others were started on their 
     * distinct thread.
     */
    public static <T> List<Stream<T>> copyStream(int copies, Stream<T> stream) {
        List<BlockingQueue<Object>> blockingQueues = new ArrayList<>(copies);
        // creating the queues used to relay the stream's elements to the stream's copies
        for (int i = 0; i < copies; i++) {
            blockingQueues.add(new LinkedBlockingQueue<>(cacheSize));
        }
        // consume the stream copies in a distinct thread, otherwise 
        // bq.put (transferring for the next stream copy) would block  
        // because the 2nd stream copy isn't yet consuming 
        Executors.newSingleThreadExecutor().execute(() -> {
            stream.forEach(streamElement -> blockingQueues.forEach(bq -> {
                try {
                    bq.put(streamElement);
                } catch (InterruptedException e) {
                    log.error(e.getMessage(), e);
                    // nothing to do here other than maybe simple optimization related to the
                    // failed bq.put (e.g. sending END_SIGNAL into bq then skipping its next put calls)
                }
            }));
            blockingQueues.forEach(bq -> {
                try {
                    bq.put(END_SIGNAL);
                } catch (InterruptedException e) {
                    log.error(e.getMessage(), e);
                    // nothing to do here
                }
            });
        });
        // creating the copies
        // A production implementation would wrap each Stream copy with 
        // something overwriting close() which to remove from blockingQueues
        // the blockingQueue corresponding to the closed Stream.
        return blockingQueues.stream().map(bq -> new SpliteratorCopy<T>(bq))
                .map(spliterator -> StreamSupport.stream(spliterator, false))
                .collect(Collectors.toList());
    }
}

@RequiredArgsConstructor
@Slf4j
public class SpliteratorCopy<T> implements Spliterator<T> {

    public static final Object END_SIGNAL = new Object();

    private final BlockingQueue<?> blockingQueue;

    @Override
    public boolean tryAdvance(final Consumer<? super T> action) {
        Object nextElement;
        try {
            nextElement = blockingQueue.take();
        } catch (InterruptedException e) {
            log.error(e.getMessage(), e);
            throw new RuntimeException(e);
        }
        if (nextElement == END_SIGNAL) {
            return false;
        }
        action.accept((T) nextElement);
        return true;
    }

    @Override
    public Spliterator<T> trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE;
    }

    @Override
    public int characteristics() {
        return Spliterator.ORDERED;
    }
}

1

我使用了this的优秀答案来编写以下类:

public class SplitStream<T> implements Stream<T> {
    private final Supplier<Stream<T>> streamSupplier;

    public SplitStream(Supplier<Stream<T>> t) {
        this.streamSupplier = t;
    }

    @Override
    public Stream<T> filter(Predicate<? super T> predicate) {
        return streamSupplier.get().filter(predicate);
    }

    @Override
    public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
        return streamSupplier.get().map(mapper);
    }

    @Override
    public IntStream mapToInt(ToIntFunction<? super T> mapper) {
        return streamSupplier.get().mapToInt(mapper);
    }

    @Override
    public LongStream mapToLong(ToLongFunction<? super T> mapper) {
        return streamSupplier.get().mapToLong(mapper);
    }

    @Override
    public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
        return streamSupplier.get().mapToDouble(mapper);
    }

    @Override
    public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {
        return streamSupplier.get().flatMap(mapper);
    }

    @Override
    public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) {
        return streamSupplier.get().flatMapToInt(mapper);
    }

    @Override
    public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) {
        return streamSupplier.get().flatMapToLong(mapper);
    }

    @Override
    public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) {
        return streamSupplier.get().flatMapToDouble(mapper);
    }

    @Override
    public Stream<T> distinct() {
        return streamSupplier.get().distinct();
    }

    @Override
    public Stream<T> sorted() {
        return streamSupplier.get().sorted();
    }

    @Override
    public Stream<T> sorted(Comparator<? super T> comparator) {
        return streamSupplier.get().sorted(comparator);
    }

    @Override
    public Stream<T> peek(Consumer<? super T> action) {
        return streamSupplier.get().peek(action);
    }

    @Override
    public Stream<T> limit(long maxSize) {
        return streamSupplier.get().limit(maxSize);
    }

    @Override
    public Stream<T> skip(long n) {
        return streamSupplier.get().skip(n);
    }

    @Override
    public void forEach(Consumer<? super T> action) {
        streamSupplier.get().forEach(action);
    }

    @Override
    public void forEachOrdered(Consumer<? super T> action) {
        streamSupplier.get().forEachOrdered(action);
    }

    @Override
    public Object[] toArray() {
        return streamSupplier.get().toArray();
    }

    @Override
    public <A> A[] toArray(IntFunction<A[]> generator) {
        return streamSupplier.get().toArray(generator);
    }

    @Override
    public T reduce(T identity, BinaryOperator<T> accumulator) {
        return streamSupplier.get().reduce(identity, accumulator);
    }

    @Override
    public Optional<T> reduce(BinaryOperator<T> accumulator) {
        return streamSupplier.get().reduce(accumulator);
    }

    @Override
    public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
        return streamSupplier.get().reduce(identity, accumulator, combiner);
    }

    @Override
    public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
        return streamSupplier.get().collect(supplier, accumulator, combiner);
    }

    @Override
    public <R, A> R collect(Collector<? super T, A, R> collector) {
        return streamSupplier.get().collect(collector);
    }

    @Override
    public Optional<T> min(Comparator<? super T> comparator) {
        return streamSupplier.get().min(comparator);
    }

    @Override
    public Optional<T> max(Comparator<? super T> comparator) {
        return streamSupplier.get().max(comparator);
    }

    @Override
    public long count() {
        return streamSupplier.get().count();
    }

    @Override
    public boolean anyMatch(Predicate<? super T> predicate) {
        return streamSupplier.get().anyMatch(predicate);
    }

    @Override
    public boolean allMatch(Predicate<? super T> predicate) {
        return streamSupplier.get().allMatch(predicate);
    }

    @Override
    public boolean noneMatch(Predicate<? super T> predicate) {
        return streamSupplier.get().noneMatch(predicate);
    }

    @Override
    public Optional<T> findFirst() {
        return streamSupplier.get().findFirst();
    }

    @Override
    public Optional<T> findAny() {
        return streamSupplier.get().findAny();
    }

    @Override
    public Iterator<T> iterator() {
        return streamSupplier.get().iterator();
    }

    @Override
    public Spliterator<T> spliterator() {
        return streamSupplier.get().spliterator();
    }

    @Override
    public boolean isParallel() {
        return streamSupplier.get().isParallel();
    }

    @Override
    public Stream<T> sequential() {
        return streamSupplier.get().sequential();
    }

    @Override
    public Stream<T> parallel() {
        return streamSupplier.get().parallel();
    }

    @Override
    public Stream<T> unordered() {
        return streamSupplier.get().unordered();
    }

    @Override
    public Stream<T> onClose(Runnable closeHandler) {
        return streamSupplier.get().onClose(closeHandler);
    }

    @Override
    public void close() {
        streamSupplier.get().close();
    }
}

当您调用其类的任何方法时,它会委托调用给。
streamSupplier.get()

所以,不是这样写:

Supplier<IntStream> supplier = () ->
    IntStream.range(1, 100).filter(n -> n % 2 == 0);
supplier.get().filter(...);
supplier.get().filter(...);

你可以做:

SplitStream<Integer> stream = 
    new SplitStream<>(() -> IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed());
stream.filter(...);
stream.filter(...);

你可以将其扩展到与IntStream、DoubleStream等一起使用...

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接