有没有办法停止Lambda闭包中的Stream.generate函数?

13

我刚开始尝试使用Java 8和Lambda表达式,我很好奇是否可以通过返回特定值(比如null)从Lambda表达式内部停止Stream的生成。使用Stream.generate()是否可行?

private int counter;

private void generate()
{
    System.out.println(Stream.generate(() -> {
        if (counter < 10) {
            counter++;
            return RandomUtils.nextInt(100);
        } else {
            return null;
        }
    }).count());
}

不幸的是,这段代码没有终止条件,因此仅仅返回null是无法使程序跳出数据流的。

7个回答

39

Java 9及以上版本包含此方法

Stream<T> takeWhile(Predicate<? super T> predicate); 

通过条件限制流。因此,下面的解决方法不再需要。

原始答案(适用于Java 9之前的版本):

使用Stream.generate,从lambda闭包中定义上来说,这是不可能的。它是无限的。使用limit()可以使您的流具有固定的大小。但这对于诸如以下条件并没有帮助:

if random>10 then stop

有可能通过条件来限制潜在的无限流。如果不知道大小,这非常有用。你的朋友在这里是一个Spliterator,你的示例代码将如下所示:

System.out.println( StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<Integer>() {
    int counter = 0;

    @Override
    public boolean hasNext() {
        return counter < 10;
    }

    @Override
    public Integer next() {
        counter++;
        return RandomUtils.nextInt(100);
    }
}, Spliterator.IMMUTABLE), false).count());

基本上,您可以从迭代器构建一个。例如,我正在使用这个结构来解析Stax XML中的流。

我知道这不是通过lambda构造完成的,但是它解决了此缺失功能的问题,即通过条件停止流项目生成。

如果有更好的方法来实现这一点(我指的是这个流结构而不是XML处理),或者在这种方式下使用流存在根本性缺陷,我非常感兴趣。


有时候无法确定是否存在下一个元素,这使得两种解决方案都不可能。 - Max
你能给个例子吗? - wumpz
想象一下一个流,它可以将返回的多个未来集合压平。您希望在所有集合都可用之前开始流式传输,因此您不知道未来集合中是否包含更多元素。 - Max

7
这是使用Lambda表达式无法实现的,你无法从表达式内部控制流程。即使API文档也说Stream.generate会生成一个无限流。
然而,你可以使用limit()方法来限制Stream并实现所需的功能:
System.out.println(Stream.generate(() -> RandomUtils.nextInt(100)).limit(10).count());

1
这是不正确的。 - Dave

1
// If you are not looking for parallelism, you can use following method:
public static <T> Stream<T> breakStream(Stream<T> stream, Predicate<T> terminate) { 
  final Iterator<T> original = stream.iterator();
  Iterable<T> iter = () -> new Iterator<T>() { 
    T t;
    boolean hasValue = false;

    @Override
    public boolean hasNext() { 
      if (!original.hasNext()) { 
        return false;
      } 
      t = original.next();
      hasValue = true;
      if (terminate.test(t)) { 
        return false;
      } 
      return true;
    } 

    @Override
    public T next() { 
      if (hasValue) { 
        hasValue = false;
        return t;
      } 
      return t;
    } 
  };

  return StreamSupport.stream(iter.spliterator(), false);
}

0

这是 Java 8 的另一种解决方案(它需要一个 Stream.Builder,可能不是最优的,但它非常简单):

@SuppressWarnings("ResultOfMethodCallIgnored")
public static <T> Stream<T> streamBreakable(Stream<T> stream, Predicate<T> stopCondition) {
    Stream.Builder<T> builder = Stream.builder();
    stream.map(t -> {
                boolean stop = stopCondition.test(t);
                if (!stop) {
                    builder.add(t);
                }
                return stop;
            })
            .filter(result -> result)
            .findFirst();

    return builder.build();
}

还有测试:

@Test
public void shouldStop() {

    AtomicInteger count = new AtomicInteger(0);
    Stream<Integer> stream = Stream.generate(() -> {
        if (count.getAndIncrement() < 10) {
            return (int) (Math.random() * 100);
        } else {
            return null;
        }
    });

    List<Integer> list = streamBreakable(stream, Objects::isNull)
            .collect(Collectors.toList());

    System.out.println(list);
}

0

使用StreamSupport.stream(Spliterator, boolean)
请参阅Spliterator的JavaDoc。
这是一个Spliterator示例:

public class GeneratingSpliterator<T> implements Spliterator<T>
{
    private Supplier<T> supplier;
    private Predicate<T> predicate;

    public GeneratingSpliterator(final Supplier<T> newSupplier, final Predicate<T> newPredicate)
    {
        supplier = newSupplier;
        predicate = newPredicate;
    }

    @Override
    public int characteristics()
    {
        return 0;
    }

    @Override
    public long estimateSize()
    {
        return Long.MAX_VALUE;
    }

    @Override
    public boolean tryAdvance(final Consumer<? super T> action)
    {
        T newObject = supplier.get();
        boolean ret = predicate.test(newObject);
        if(ret) action.accept(newObject);
        return ret;
    }

    @Override
    public Spliterator<T> trySplit()
    {
        return null;
    }
}

-1

这是可能的,你只需要打破常规思维。

以下想法借鉴自Python,这门语言向我介绍了生成器函数...

当你在Supplier<T>闭包中完成后,只需抛出一个RuntimeException实例,并在调用处捕获并忽略它。

以下是一个示例摘录(请注意,我添加了一个安全捕获Stream.limit(Long.MAX_VALUE)以覆盖意外情况,尽管它永远不应该被触发):

static <T> Stream<T> read(String path, FieldSetMapper<T> fieldSetMapper) throws IOException {
    ClassPathResource resource = new ClassPathResource(path);
    DefaultLineMapper<T> lineMapper = new DefaultLineMapper<>();
    lineMapper.setFieldSetMapper(fieldSetMapper);
    lineMapper.setLineTokenizer(getTokenizer(resource));

    return Stream.generate(new Supplier<T>() {
        FlatFileItemReader<T> itemReader = new FlatFileItemReader<>();
        int line = 1;
        {
            itemReader.setResource(resource);
            itemReader.setLineMapper(lineMapper);
            itemReader.setRecordSeparatorPolicy(new DefaultRecordSeparatorPolicy());
            itemReader.setLinesToSkip(1);
            itemReader.open(new ExecutionContext());
        }

        @Override
        public T get() {
            T item = null;
            ++line;
            try {
                item = itemReader.read();
                if (item == null) {
                    throw new StopIterationException();
                }
            } catch (StopIterationException ex) {
                throw ex;
            } catch (Exception ex) {
                LOG.log(WARNING, ex,
                        () -> format("%s reading line %d of %s", ex.getClass().getSimpleName(), line, resource));
            }
            return item;
        }
    }).limit(Long.MAX_VALUE).filter(Objects::nonNull);
}

static class StopIterationException extends RuntimeException {}

public void init() {
    if (repository.count() == 0) {
        Level logLevel = INFO;
        try {
            read("providers.csv", fields -> new Provider(
                    fields.readString("code"),
                    fields.readString("name"),
                    LocalDate.parse(fields.readString("effectiveStart"), DateTimeFormatter.ISO_LOCAL_DATE),
                    LocalDate.parse(fields.readString("effectiveEnd"), DateTimeFormatter.ISO_LOCAL_DATE)
            )).forEach(repository::save);
        } catch (IOException e) {
            logLevel = WARNING;
            LOG.log(logLevel, "Initialization was interrupted");
        } catch (StopIterationException ignored) {}
        LOG.log(logLevel, "{} providers imported.", repository.count());
    }
}

1
那不是跳出常规思维的表现,而是利用异常来控制流程。这通常被认为是一种不好的做法,有很多原因。 - Andrew Charneski
你可以争论好的和坏的设计,但唯一可行的选择是实际有效的。普遍认为什么是好的或坏的规则都有附带条件,用户应该理解指定状态的基本原理。在这种情况下,原因是潜在的代码混淆和异常处理设置的成本。这些原因在这里都不相关,并且它们被需要一个工作解决方案所取代。在某些语言中,如Python,使用异常进行流程控制是正常的 - 这是核心语言类中迭代终止的方式。 - Dave
最终,不使用异常来进行流程控制源于异常的产生方式 - 停止使用流程控制(即魔术结果值)来表示异常状态。并且允许异常处理与流程控制正交。这些都是很好的想法,但现实总是胜过理想主义。在这种情况下,抛出异常是向调用代码传递消息的唯一可用机制。 - Dave

-4
我的解决方案是在完成时生成一个空值,然后应用过滤器。
Stream
 .generate( o -> newObject() )
 .filter( o -> o != null )
 .forEach(...)

6
无法正常工作,因为在这种情况下发电机不会停止。 - ultraon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接