我们曾经有类似的问题需要解决。我们想要处理一些大于系统内存的流(遍历数据库中的所有对象),并尽可能地将其随机排序 - 我们认为缓冲10,000个项目并对它们进行随机排序应该是可以的。
目标是一个接受流作为输入的函数。
在这里提出的解决方案中,似乎有各种选项:
- 使用各种非Java 8附加库
- 从不是流的东西开始 - 比如随机访问列表
- 拥有一个可以轻松分割的流
我们最初的直觉是使用自定义收集器,但这意味着退出流处理。上面的自定义收集器解决方案非常好,我们差点用它。
这里有一个解决方案是通过利用Stream
可以给你一个Iterator
的事实来欺骗。你可以使用逃生口来做一些额外的事情,而流不支持这样做。使用另一个Java 8 StreamSupport
魔法将Iterator
转换回流。
public class BatchingIterator<T> implements Iterator<List<T>> {
public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
}
private static <T> Stream<T> asStream(Iterator<T> iterator) {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator,ORDERED),
false);
}
private int batchSize;
private List<T> currentBatch;
private Iterator<T> sourceIterator;
public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
this.batchSize = batchSize;
this.sourceIterator = sourceIterator;
}
@Override
public boolean hasNext() {
prepareNextBatch();
return currentBatch!=null && !currentBatch.isEmpty();
}
@Override
public List<T> next() {
return currentBatch;
}
private void prepareNextBatch() {
currentBatch = new ArrayList<>(batchSize);
while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
currentBatch.add(sourceIterator.next());
}
}
}
使用它的简单示例如下所示:
@Test
public void getsBatches() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
.forEach(System.out::println);
}
上面的内容打印出来是:
[A, B, C]
[D, E, F]
对于我们的应用场景,我们想要将批次打乱然后保持其作为一个流 - 看起来像这样:
@Test
public void howScramblingCouldBeDone() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
.map(list -> {
Collections.shuffle(list); return list; })
.flatMap(List::stream)
.forEach(System.out::println);
}
这将输出类似于以下内容(每次随机生成,因此每次不同)
A
C
B
E
D
F
这里的秘密在于始终有一个流,因此您可以对批处理数据流进行操作,或对每个批次执行某些操作,然后将其flatMap
回流中。更好的是,所有上述操作只在最终的forEach
、collect
或其他终止表达式 拉取 数据通过流。
事实证明,iterator
是流的一种特殊类型的终止操作,不会导致整个流运行并进入内存!感谢Java 8开发者的出色设计!
flatMap
的反函数(+ 一个额外的 flatMap 来再次折叠流)?我不认为标准库中存在这样一个方便的方法。你要么得找到一个第三方库,要么就得根据 spliterator 和/或发出流的收集器编写自己的库。 - the8472Stream.generate
与reader::readLine
和limit
结合起来,但问题是流与异常不太兼容。此外,这也很可能不容易并行化。我认为for
循环仍然是最好的选择。 - tobias_k