Java Streams - 缓冲大型数据流

5
我试图将由大量数据支持的多个流合并为一个流,然后对它们进行缓冲。我可以轻松地将这些流合并为单个项目流。然而,在尝试缓冲/分块流时,它会尝试完全缓冲第一个流,这会立即填满我的内存。
我花了一些时间来缩小问题范围,以下是一些代码。
我可以重构代码以避免此问题,但在不确切了解为什么会出现这种情况的情况下,我觉得使用流就像定时炸弹一样危险。
我参考了Java 8流上的缓冲操作的灵感
import java.util.*;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class BreakStreams
{

   //@see https://stackoverflow.com/questions/47842871/buffer-operator-on-java-8-streams
   /**
    * Batch a stream into chunks
    */
   public static <T> Stream<List<T>> buffer(Stream<T> stream, final long count)
   {
      final Iterator<T> streamIterator = stream.iterator();

      return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<List<T>>()
      {
         @Override public boolean hasNext()
         {
            return streamIterator.hasNext();
         }

         @Override public List<T> next()
         {
            List<T> intermediate = new ArrayList<>();
            for (long v = 0; v < count && hasNext(); v++)
            {
               intermediate.add(streamIterator.next());
            }
            return intermediate;
         }
      }, 0), false);
   }

   public static void main(String[] args)
   {

      //create streams from huge datasets
      Stream<Long> streams = Stream.of(LongStream.range(0, Integer.MAX_VALUE).boxed(),
                                       LongStream.range(0, Integer.MAX_VALUE).boxed())
                                   //collapse into one stream
                                   .flatMap(x -> x);
      //iterating over the stream one item at a time is OK..
//      streams.forEach(x -> {

      //buffering the stream is NOT ok, you will go OOM
      buffer(streams, 25).forEach(x -> {
         try
         {
            Thread.sleep(2500);
         }
         catch (InterruptedException ignore)
         {
         }
         System.out.println(x);
      });
   }
}


3
请参阅为什么 Java 流中的 flatMap() 后面跟 filter() “不完全”是惰性的?。您可以更新您的 Java 版本,或者对于两个流,您可以使用Stream.concat(…)而不是Stream.of(…).flatMap(…) - Holger
我正在使用Java 11.0.5,所以我不认为是这个问题......不过很有趣 - Stream.concat似乎可以工作!也许他们添加的修复程序并不适用于所有情况。谢谢! - Michael Martin
1个回答

6
这似乎与旧问题“为什么在Java Streams中对flatMap()之后进行filter()不完全是惰性的?”有关。尽管该问题已经针对Stream的内置操作得到解决,但当我们尝试在外部迭代flatmapped流时,仍然存在该问题。
我们可以简化代码以重现问题:
Stream.of(LongStream.range(0, Integer.MAX_VALUE))
    .flatMapToLong(x -> x)
    .iterator().hasNext();

请注意,使用 Spliterator 也会受到影响。
Stream.of(LongStream.range(0, Integer.MAX_VALUE))
    .flatMapToLong(x -> x)
    .spliterator()
    .tryAdvance((long l) -> System.out.println("first item: "+l));

两者都试图缓冲元素,直到最终由于 OutOfMemoryError 而退出。

由于 spliterator().forEachRemaining(…) 似乎不受影响,因此您可以实现一个适用于 forEach 的解决方案,但它会很脆弱,因为它仍然会在短路流操作中出现问题。

public static <T> Stream<List<T>> buffer(Stream<T> stream, final int count) {
    boolean parallel = stream.isParallel();
    Spliterator<T> source = stream.spliterator();
    return StreamSupport.stream(
        new Spliterators.AbstractSpliterator<List<T>>(
            (source.estimateSize()+count-1)/count, source.characteristics()
                &(Spliterator.SIZED|Spliterator.DISTINCT|Spliterator.ORDERED)
                    | Spliterator.NONNULL) {
            List<T> list;
            Consumer<T> c = t -> list.add(t);
            @Override
            public boolean tryAdvance(Consumer<? super List<T>> action) {
                if(list == null) list = new ArrayList<>(count);
                if(!source.tryAdvance(c)) return false;
                do {} while(list.size() < count && source.tryAdvance(c));
                action.accept(list);
                list = null;
                return true;
            }
            @Override
            public void forEachRemaining(Consumer<? super List<T>> action) {
                source.forEachRemaining(t -> {
                    if(list == null) list = new ArrayList<>(count);
                    list.add(t);
                    if(list.size() == count) {
                        action.accept(list);
                        list = null;
                    }
                });
                if(list != null) {
                    action.accept(list);
                    list = null;
                }
            }
        }, parallel);
}

但请注意,基于Spliterator的解决方案通常更可取,因为它们支持携带额外的信息以实现优化,并在许多使用情况下具有更低的迭代成本。因此,一旦JDK代码中修复了此问题,这就是前进的方式。
作为解决方法,您可以使用Stream.concat(...)来组合流,但是在其文档中明确警告不要一次组合太多的流:

当从重复连接构造流时,请谨慎。访问深度连接的流的元素可能会导致深层调用链或甚至StackOverflowException[错别字]。

Java 9文档已将异常名称更正为StackOverflowError


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接