直接回答是:是的
虽然没有专门支持此功能,但可以实现它。我看到可能的方法有:
a. 复制整个流数据,然后根据其创建流副本 -> RAM消耗可能会是一个障碍
b. 读取流并将其每个元素中继到副本中 -> 我将在下面详细说明这种方法
概念
让我们想象一下b.
解决方案:
<T> List<Stream<T>> copyStream(int copiesCount, Stream<T> originalStream)
允许创建copiesCount
个originalStream
的副本。
为了理解这个解决方案,必须理解流和可能通过它流动的数据元素之间的区别:例如,苹果、胡萝卜和土豆将是数据元素,而它们流动到达某个目的地的管道将是流。复制一个
Stream
就像创建更多的管道:然后必须将原始管道(即
originalStream
)连接到其他管道(也称为
streamCopies
);虽然在现实世界中无法将苹果对象从一个管道传递到更多管道(即
streamCopies
),但在编程中这是可能的:只需将包含苹果对象引用的变量传递给流副本。
实现细节
Java实现的Stream对解决方案的形状有很大影响。第一个影响是与数据元素通过流(又称为管道)时发生的情况有关:要实际读取(和处理)
Stream
中的元素,必须使用终端方法,例如
forEach
。在我们的情况下,必须调用
originalStream.forEach
,以便读取每个元素并传递给
streamCopies
(即下游管道);这必须在
copyStream()
方法返回之前发生,这是不好的,因为
forEach
将阻塞,直到所有
originalStream
元素都被消耗完。为了解决这个问题,
copyStream()
的实现将在一个线程中调用
originalStream.forEach
。消耗
originalStream
元素意味着将它们传递给下游管道(即
streamCopies
);由于没有缓存,必须确保每个
originalStream
元素在进入下一个元素之前传递给每个
streamCopies
。这意味着所有的
streamCopies
必须同时消费:如果某个
streamCopies
没有消费,它将阻塞所有其他
streamCopies
,因为
originalStream
将停止传递到下游管道,直到每个人都消费了当前元素(也就是说,它不会为后来的
streamCopies
消费者缓存任何内容)。但是,在Java中消耗一个
Stream
意味着在其上调用终端操作(例如
forEach
),这会阻塞执行,直到整个流被消耗完;因为我们需要所有的
streamCopies
并行消耗,所以必须为每个线程创建一个独立的线程!嗯,作为一个杂项事实,事实上可以在当前(主)线程上消耗其中一个
streamCopies
。总结一下,解决方案的使用如下所示:
List<Stream<?>> streamCopies = copyStream(copiesCount, originalStream);`
最终考虑
上述一些限制可以被规避:
复制过程是破坏性的,即在调用copyStream()后,originalStream将无法使用,因为它将处于待消耗状态。如果确实想要使用它,可以创建一个额外的副本,可能在当前(主)线程上进行消费(但仅在启动所有其他副本的消费之后)。
streamCopies必须消耗所有接收到的originalStream元素,否则,如果有一个停止了,其他人也会阻塞(请再次阅读“实现细节”部分以了解原因)。这意味着每个streamCopies元素的消耗都必须发生在try ... catch中,以确保没有失败(也称为处理停止)。实际的生产实现事实上会通过包装每个Stream副本来规避这种情况,其中包含一些覆盖close()方法的内容,以从originalStream到streamCopies传输逻辑中删除失败的流副本(也就是丢弃用于在originalStream线程和originalStream线程之间通信的底层blockingQueue-请参见下面的实现)。这意味着客户端将被强制关闭Stream副本,但这并不是很常见,例如查看Spring的JDBCTemplate.queryForStream()结果具有相同的要求。
与先前指出的一样,每个streamCopies终端操作必须在单独的线程中执行-没有解决方法。
代码
以下是实现 b.
解决方案的代码和检查其正确性的测试。
@Test
void streamCopyTest() throws ExecutionException, InterruptedException {
List<Stream<String>> streamCopies = copyStream(3, Stream.of("a", "b", "c", "d"));
ExecutorService executorService = Executors.newCachedThreadPool();
CompletableFuture<?>[] futures =
streamCopies.stream().map(stream -> CompletableFuture.runAsync(() -> {
String outcome = stream.collect(Collectors.joining(", "));
log.info("\n{}", outcome);
}, executorService)).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).get();
executorService.shutdown();
}
@RequiredArgsConstructor
@Slf4j
public class StreamCopiesFactory {
private static final int cacheSize = 1;
public static <T> List<Stream<T>> copyStream(int copies, Stream<T> stream) {
List<BlockingQueue<Object>> blockingQueues = new ArrayList<>(copies);
for (int i = 0; i < copies; i++) {
blockingQueues.add(new LinkedBlockingQueue<>(cacheSize));
}
Executors.newSingleThreadExecutor().execute(() -> {
stream.forEach(streamElement -> blockingQueues.forEach(bq -> {
try {
bq.put(streamElement);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}));
blockingQueues.forEach(bq -> {
try {
bq.put(END_SIGNAL);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
});
});
return blockingQueues.stream().map(bq -> new SpliteratorCopy<T>(bq))
.map(spliterator -> StreamSupport.stream(spliterator, false))
.collect(Collectors.toList());
}
}
@RequiredArgsConstructor
@Slf4j
public class SpliteratorCopy<T> implements Spliterator<T> {
public static final Object END_SIGNAL = new Object();
private final BlockingQueue<?> blockingQueue;
@Override
public boolean tryAdvance(final Consumer<? super T> action) {
Object nextElement;
try {
nextElement = blockingQueue.take();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
if (nextElement == END_SIGNAL) {
return false;
}
action.accept((T) nextElement);
return true;
}
@Override
public Spliterator<T> trySplit() {
return null;
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public int characteristics() {
return Spliterator.ORDERED;
}
}