在Java 8流上实现自定义中间操作

9

我正在尝试弄清楚如何在Java 8 Stream上实现自定义中间操作。但似乎我已经被锁定了 :(

具体来说,我想将一个流转换为每个条目,直到遇到第一个具有特定值的条目,包括这个条目在内。然后停止生成任何后续的条目 - 使其具有短路效应。

它正在对输入数据运行一系列验证检查。如果有错误,我希望在第一个错误处停止,但在此期间,我想收集警告信息。并且由于这些验证检查可能是昂贵的 - 涉及数据库查找等等,所以我只想运行必要的最小集合。

因此代码大致如下:

Optional<ValidationResult> result = validators.stream()
    .map(validator -> validator.validate(data))
    .takeUntil(result -> result.isError()) // This is the bit I can't do
    .reduce(new ValidationResult(), ::mergeResults);

看起来我应该能够使用ReferencePipeline.StatefulOp实现某些操作,但它都是包范围的,所以无法扩展。因此,我想知道正确的实现方式是什么?或者是否可能?

还需要注意的是 - 这需要在Java 8中完成,而不是9+,因为由于各种无关原因,我们还没有升级。

谢谢


1
寻找 Java-9 中的 takeWhile - Naman
也许这个答案可以帮助你在Java 8中创建自己的takeWhile()函数。 - Samuel Philipp
ValidationResult 中有哪些值?是否可以忽略其字段,只关心 isError 和剩下的验证器?如果是这样,请查看我的答案... - buræquete
这似乎是你尝试达到的目标。 - Angel Koh
1
@Naman takeWhile 不起作用,因为问题中包括“包括”一词。 - Eugene
你有检查过任何答案吗?我有一个不同的方法,很想听听你的反馈! - buræquete
4个回答

3

通常情况下,自定义操作需要处理 Spliterator 接口。它通过添加特征和大小信息以及拆分元素中的一部分作为另一个 Spliterator(因此得名)来扩展了 Iterator 的概念。它还通过只需要一个方法简化了迭代逻辑。

public static <T> Stream<T> takeWhile(Stream<T> s, Predicate<? super T> condition) {
    boolean parallel = s.isParallel();
    Spliterator<T> spliterator = s.spliterator();
    return StreamSupport.stream(new Spliterators.AbstractSpliterator<T>(
        spliterator.estimateSize(),
        spliterator.characteristics()&~(Spliterator.SIZED|Spliterator.SUBSIZED)) {
            boolean active = true;
            Consumer<? super T> current;
            Consumer<T> adapter = t -> {
                if((active = condition.test(t))) current.accept(t);
            };

            @Override
            public boolean tryAdvance(Consumer<? super T> action) {
                if(!active) return false;
                current = action;
                try {
                    return spliterator.tryAdvance(adapter) && active;
                }
                finally {
                    current = null;
                }
            }
        }, parallel).onClose(s::close);
}

为了保持流的属性,我们首先查询并重新建立新流的并行状态。同时,我们注册一个关闭操作,以关闭原始流。
主要工作是实现一个装饰前一个流状态的Spliterator。
除了SIZED和SUBSIZED之外,特性都被保留下来,因为我们的操作结果大小是不可预测的。原始大小仍然传递,现在将用作估计值。
此解决方案在操作期间存储传递给tryAdvance的Consumer,以便能够使用相同的适配器Consumer,避免为每个迭代创建新的Consumer。这是可行的,因为保证tryAdvance不会并发调用。
并行性是通过继承自AbstractSpliterator的分割完成的。这个继承的实现将缓冲一些元素,这是合理的,因为对于像takeWhile这样的操作,实现更好的策略真的很复杂。
所以你可以像这样使用它:
    takeWhile(Stream.of("foo", "bar", "baz", "hello", "world"), s -> s.length() == 3)
        .forEach(System.out::println);

将会打印出来

foo
bar
baz

或者

takeWhile(Stream.of("foo", "bar", "baz", "hello", "world")
    .peek(s -> System.out.println("before takeWhile: "+s)), s -> s.length() == 3)
    .peek(s -> System.out.println("after takeWhile: "+s))
    .forEach(System.out::println);

将会打印

before takeWhile: foo
after takeWhile: foo
foo
before takeWhile: bar
after takeWhile: bar
bar
before takeWhile: baz
after takeWhile: baz
baz
before takeWhile: hello

这说明它不会在必要时处理超过所需的元素。在 takeWhile 阶段之前,我们必须遇到第一个不匹配的元素,然后,我们只会遇到那些元素。


Spliterator是Java流设计的一个非常糟糕的例子。你想要一个简单的流"中间操作",比如说过滤掉流中的空元素?你可以选择使用mapMulti,但是在你的流水线中读取它将会非常困难!或者你可以用这个可复用的东西来实现它!每个人都必须处理超级强大的能力来为并行性划分流,即使在98.5%的使用情况下这不是所需的——也不是所想要的特性。与使用C# LINQ扩展方法实现相比较,Java做得很好! - davidbak
1
@davidbak,你可以使用.filter(x -> x != null)或者.filter(Objects::nonNull)来“过滤掉流中的空元素”。对于这样一个琐碎的任务,没有必要实现自定义处理。 - Holger
1
@davidbak mapMulti只是针对元素数量较少的flatMap的优化。在JDK 16之前,你甚至不知道自己“需要mapMulti”。这只是另一个关于人为情景的讨论。如果你认为C#很棒,那就用C#,开心就好,别再用你的抱怨来打扰我了。 - Holger

1

我承认从代码角度来看,Holger的答案更加优美,但这个可能更容易理解:

public static <T> Stream<T> takeUntilIncluding(Stream<T> s, Predicate<? super T> condition) {

    class Box implements Consumer<T> {

        boolean stop = false;

        T t;

        @Override
        public void accept(T t) {
            this.t = t;
        }
    }

    Box box = new Box();

    Spliterator<T> original = s.spliterator();

    return StreamSupport.stream(new AbstractSpliterator<>(
        original.estimateSize(),
        original.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED)) {

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {

            if (!box.stop && original.tryAdvance(box) && condition.test(box.t)) {
                action.accept(box.t);
                return true;
            }

            box.stop = true;

            return false;
        }
    }, s.isParallel());

}

0

你可以用一个技巧来实现:

List<ValidationResult> res = new ArrayList<>(); // Can modify it with your `mergeResults` instead of list

Optional<ValidationResult> result = validators.stream()
    .map(validator -> validator.validate(data))
    .map(v -> {
       res.add(v);
       return v;
    })
    .filter(result -> result.isError())
    .findFirst();

List<ValidationResult> res 将包含您感兴趣的值。


0
您可以使用以下结构;
AtomicBoolean gateKeeper = new AtomicBoolean(true);    
Optional<Foo> result = validators.stream()
    .filter(validator -> gateKeeper.get() 
                && gateKeeper.compareAndSet(true, !validator.validate(data).isError()) 
                && gateKeeper.get())
    .reduce(...) //have the first n non-error validators here

gateKeeper 过滤器作为一种短路逻辑,会一直执行直到遇到第一个 isError() == true 的情况,拒绝它,然后关闭其他 validate() 调用的大门。看起来有点疯狂,但比其他自定义实现要简单得多,如果符合您的要求,可能完美地工作。

不确定这是否有帮助,因为我忽略了 validator.validate(data) 的结果,除了 isError() 的结果,以及它属于列表中的哪个 validator


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接