我试图将由大量数据支持的多个流合并为一个流,然后对它们进行缓冲。我可以轻松地将这些流合并为单个项目流。然而,在尝试缓冲/分块流时,它会尝试完全缓冲第一个流,这会立即填满我的内存。
我花了一些时间来缩小问题范围,以下是一些代码。
我可以重构代码以避免此问题,但在不确切了解为什么会出现这种情况的情况下,我觉得使用流就像定时炸弹一样危险。
我参考了Java 8流上的缓冲操作的灵感。
我花了一些时间来缩小问题范围,以下是一些代码。
我可以重构代码以避免此问题,但在不确切了解为什么会出现这种情况的情况下,我觉得使用流就像定时炸弹一样危险。
我参考了Java 8流上的缓冲操作的灵感。
import java.util.*;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class BreakStreams
{
//@see https://stackoverflow.com/questions/47842871/buffer-operator-on-java-8-streams
/**
* Batch a stream into chunks
*/
public static <T> Stream<List<T>> buffer(Stream<T> stream, final long count)
{
final Iterator<T> streamIterator = stream.iterator();
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<List<T>>()
{
@Override public boolean hasNext()
{
return streamIterator.hasNext();
}
@Override public List<T> next()
{
List<T> intermediate = new ArrayList<>();
for (long v = 0; v < count && hasNext(); v++)
{
intermediate.add(streamIterator.next());
}
return intermediate;
}
}, 0), false);
}
public static void main(String[] args)
{
//create streams from huge datasets
Stream<Long> streams = Stream.of(LongStream.range(0, Integer.MAX_VALUE).boxed(),
LongStream.range(0, Integer.MAX_VALUE).boxed())
//collapse into one stream
.flatMap(x -> x);
//iterating over the stream one item at a time is OK..
// streams.forEach(x -> {
//buffering the stream is NOT ok, you will go OOM
buffer(streams, 25).forEach(x -> {
try
{
Thread.sleep(2500);
}
catch (InterruptedException ignore)
{
}
System.out.println(x);
});
}
}
Stream.concat(…)
而不是Stream.of(…).flatMap(…)
。 - Holger