Java 8分叉流用于MapUntil

3

我最近开始使用Java Stream技术。 现在我碰到了以下的问题:

Stream<T> mapUntil(Stream<T> in, Function<T,T> mapFunc, Predicate<Stream<T>> predicate)

或更一般性的。
Stream<T> applyUntil(Stream<T> in, Function<Stream<T>,Stream<T>> func,
  Predicate<Stream<T>> predicate)

他们的天真实现
Stream<T> mapUntil(Stream<T> in, Function<T,T> mapFunc, Predicate<Stream<T>> predicate){
    return applyUntil(in,in->in.map(mapFunc),predicate)
}
Stream<T> applyUntil(Stream<T> in, Function<Stream<T>,Stream<T>> func,
  Predicate<Stream<T>> predicate){
    if(predicate.test(in)) return in;
    return applyUntil(func.apply(in),func,predicate);
}

不幸的是,mapUntil(stream,mapFunc,s->s.anyMatch(predicate))会导致IllegalStateException: Stream has already been operated on or closed,这是合理的,因为我在同一个流上调用了anyMatchmap。所以我想出了一个不同的applyUntil实现:

Stream<T> applyUntil(Stream<T> in, Function<Stream<T>,Stream<T>> func,
  Predicate<Stream<T>> predicate){
    List<T> collected = in.collect(Collectors.toList());
    if(predicate.test(collected.stream())) return collected.stream()
    return applyUntil(func.apply(collected.stream(),func,predicate);
}

这显然存在很多问题。

  • 它无法处理无限(或非常巨大)的流。在我的特殊情况下,这是可以接受的,但对于如此通用的方法来说,这感觉很糟糕。
  • 它违反了流的本意,因为所有的惰性都被失去了——由于collect(Collectors.toList()),每个数据都必须被计算和存储。

我尝试修改我的代码,以消除第二个问题,重写了applyUntil

Stream<T> applyUntil(Stream<T> in, Function<Stream<T>,Stream<T>> func,
  Predicate<Stream<T>> predicate){
    List<T> collected = in.collect(Collectors.toList());
    return applyUntil(()->collected.stream(),func,predicate);
}
Stream<T> applyUntil(Supplier<Stream<T>> sup, Function<Stream<T>,Stream<T>> func,
  Predicate<Stream<T>> pred){
    if(predicate.test(sup.get())) return sup.get();
    return applyUntil(()->func.apply(sup.get()),func,predicate);
}

这个实现确实可以工作,但非常缓慢,特别是当你有一个非常昂贵的函数时。当我更仔细地看它时,我认识到原因:它调用了 predicate.test(collected.stream())predicate.test(func.apply(collected.stream()))predicate.test(func.apply(func.apply(collected.stream())) 等等,导致 func 呼叫 O(n^2) 次,相比需要 n 次呼叫,这不是很好。
在我的天真世界中,应该有一个比这两者更好的解决方案。像这样(只是一个快速的草案,AddFirst - Exists 是 MyStream 的简单惰性实现。我在这段代码的末尾错过了一个类似于 Fork 的类,用于默认的 Java 流):
interface MyStream<T>{
    T get();
    boolean hasNext();
}
class Convert<T> implements MyStream<T>{
     Iterator<T> inner;
     pulic Convert(Interator<T> iter){
          inner=iter;
     }
     public boolean hasNext(){
          return inner.hasNext();
     }
     public T get(){
          return inner.get();
     }
class AddFirst<T> implements MyStream<T>{
     T item;
     MyStream<T> inner;
     boolean used;
     public AddFirst(T t, MyStream<T> prev){
         item=t;
         inner=prev;
         used=false;
     }
     public T get(){
          if(used) return inner.get();
          used=true;
          return item;
     }
     public boolean hasNext(){
         return !used || inner.hasNext();
     }

}
class Filter<T> implements MyStream<T>{
     Predicate<T> filter;
     MyStream<T> inner
     public Filter(Predicate<T> test, MyStream<T> prev){
         filter=test;
         inner=prev;
     }
     public T get(){
          while(true){
              T curr = inner.get(); //if !inner.hasNext, this throws NoSuchElementException
              if(filter.test(curr)) return curr;
          }
     }
     public boolean hasNext(){
         try{
             T item = get();
             inner = new AddFirst(item,inner);
             return true;
         }
         catch(NoSuchElementException e){
             return false;
         }
     }
}
class Map<K,T> implements MyStream<T>{
    MyStream<K> inner;
    Function<K,T> func;
    public Map(Function<K,T> func,MyStream<K> prev){
        this.func=func;
        inner = prev;
    }
    public T get(){
        return func.apply(inner.get());
    }
    public boolean hasNext(){
        return inner.hasNext();
    }
}
class Forall<T> implements Predicate<MyStream<T>>{
    Predicate<T> pred;
    public Forall(Predicate<T> func){
        pred=func;
    }
    public boolean test(MyStream<T> ms){
        while(ms.hasNext()){
            if(!pred.test(ms.get()) return false;
        }
        return true;
    }
}
class Exists<T> implements Predicate<MyStream<T>>{
    Predicate<T> pred;
    public Forall(Predicate<T> func){
        pred=func;
    }
    public boolean test(MyStream<T> ms){
        while(ms.hasNext()){
            if(pred.test(ms.get()) return true;
        }
        return false;
    }
}
class Fork<T>{
    Deque<T> advance;
    MyStream<T> inner;
    boolean ahead;
    MyStream<T> master;
    MyStream<T> slave;
    public Fork(MyStrem<T> prev){
         inner=prev;
         advance= new LinkedList<T>();
         ahead=false;
         master = new ForkStream(true);
         slave = new ForkStream(false);
    }
    public MyStream<T> getMaster(){
        return master;
    }
    public Iterator<T> getMasterIter(){
        return master;
    }
    public MyStream<T> getSlave(){
        return slave;
    }
    public Iterator<T> getSlaveIter(){
        return slave;
    }
    class ForkStream implements MyStream<T>, Iterator<T>{
         boolean role;
         public ForkStream(boolean in){
             role=in;
         }
         public T get(){
              if(role==ahead||advance.size()==0){
                 ahead=role;
                 T item = inner.get();
                 advance.addLast(item);
                 return item;
              }
              else{
                  return advance.removeFirst();
              }
         }
         public boolean hasNext(){
               return (role!=ahead&&advance.size()!=0) || inner.hasNext();
         }
         public T next(){
               return get();
         }
    }
}

使用这些类,我可以重写我的方法:

Stream<T> applyUntil(Stream<T> in, Function<Stream<T>,Stream<T>> func,
  Predicate<Stream<T>> predicate){
    Fork<T> fork = new Fork(new Convert<T>(in.iterator()));
    Stream<T> master = StreamSupport.stream(
        Spliterators.spliteratorUnknownSize(fork.getMasterIter(),0),false);
    Stream<T> slave = StreamSupport.stream(
        Spliterators.spliteratorUnknownSize(fork.getSlaveIter(),0),false);
    if(predicate.test(master)) return slave
    return applyUntil(func.apply(slave,func,predicate);
}

这个方法适用于无限流,仍然是惰性的,可以重复使用已经计算出来的值。看起来对于任何目的都很合适。

编辑: 当我试图解释为什么这个代码块不能编译时,我找到了一种使其编译的方法。它仍然不是很好,失去了很多流的魔力,也不是线程安全的。此外,MyStream应该有一个close方法,以表示您不再对任何数据感兴趣-因此Fork不必为您保存它。所以另一个问题在我的脑海中出现了:你能从像迭代器那样的东西创建一个`j.u.stream.Stream`,告诉它的迭代器它不再对任何数据感兴趣(因为短路)吗?

因此,我的问题是:JDK8没有外部库是否有类似Fork的东西,保持更多的魔力存在? 如果是:哪个类/方法可以帮助我? 如果没有:为什么没有?并且:你如何自己实现它,保留尽可能多的魔力?

感谢阅读,抱歉这篇长文 :/
Alex


非常感谢您提出这个有趣的问题。我正在努力理解您的解决方案,您是否介意更新代码以使其能够编译通过?例如:fork.master 应该是一个 java.util.stream.Stream<T>.slave() 也应该是同样的类型。 - tkachuko
@tkachuko 当我试图解释为什么这是不可能的时候,我想到了一个编译版本,你现在可以看到它。它仍然不是很好,并引出了另一个问题 - 但至少现在它可以工作了,谢谢 :) - alex berne
2个回答

2

Java 9将有 takeWhiledropWhile。结合使用 Stream.concat,你可以做到:

Stream.concat(
  sourceCollection.stream().takeWhile(predicate).map(mapper), 
  sourceCollection.stream().dropWhile(predicate.negate())
)

这种方法不会为中间集合消耗额外的内存,但是如果在到达第二个流之前无法短路,则需要遍历前缀两次,从而消耗CPU时间。

要实现更高效的解决方案,您可以通过提取stream.spliterator()并将其包装成Spliterator的自定义子类(如果您对实现并行支持不太关心,则使用j.u.Spliterators.AbstractSpliterator),然后使用j.u.s.StreamSupport.stream(Spliterator<T>, boolean)将其包装回流中来实现中间有状态操作,例如条件映射函数。


在我的看法中,takeWhile需要一个 Predicate<T>,但是我正在寻找一个接受 Predicate<Stream<T>> 的“修改流直到”函数 - 所以我不知道如何在我的情况下使用 dropWhiletakeWhile - alex berne
1
我的回答的第二句解释了如何使用它...哪个部分不清楚? - the8472
1
好的,看起来我没有理解你问题的意图。你的意思是想对每个项目重复应用映射函数,直到满足某些条件为止吗?因为你不能在不消耗它的情况下测试整个流上的任何内容,即你不能根据稍后发生的某些事情在第一个项目上做出决策,除非你将整个流捕获到集合中。也许你应该写一个迭代样式的例子,即带有嵌套循环的例子? - the8472
1
看起来你在处理可能存在两个无限的情况。递归地应用映射函数,可能会无限次地应用到潜在的无限元素上。虽然可以进行短路处理,但有许多不同的策略可以尝试(深度优先、广度优先、平衡的广度/深度推进)。如果没有了解底层问题,它们可能会失败或者效率低下。 - the8472
1
然后你可以简化问题,不对整个流应用谓词,而只是应用在第一个元素上,因为只有满足条件的第一个元素时,你的逻辑才能终止,否则它会无限下降。 - the8472
显示剩余3条评论

1
首先,您对性能问题的分析需要进行更正。在您的第二个变体中,无论多少次调用func.apply,都没有任何作用,因为该函数不执行任何操作。它所做的只是在稍后处理的流上链接另一个中间操作,但它取决于该处理有多大性能影响。
在这方面,您过于关注大型甚至无限流上的短路操作,在您的特定设置中很早就完成。基本问题是每个筛选步骤可能会处理所有流元素,并且必须在下一个筛选步骤之前完成它,而且有不可预测的筛选步骤数量,这一点并没有改变。
如果您始终使用谓词和流元素的组合,允许早期短路,那么您的第三个解决方案就会表现出色,但请注意,在这些情况下,您的第二个解决方案的问题不是嵌套的函数应用,而是您仍然将整个初始流收集到List中。当您跳过该步骤并首先调用接受Supplier<Stream<T>>的方法时,您就不会遇到这些问题。

那么,将元素缓冲到Deque中还是不缓冲,这取决于您链式操作到流的实际中间操作的权重。请注意,您可以在不镜像的情况下使用Stream API来执行第3种方法中所做的操作:

/** returns a {@code List} containing two {@code Stream}s */
public static <T> List<Stream<T>> fork(Stream<T> source) {
    Spliterator<T> srcSp=source.spliterator();
    ArrayDeque<T> deque=new ArrayDeque<>();
    Boolean[] ahead={ null };
    final class Branch extends Spliterators.AbstractSpliterator<T> {
        private final Boolean type;
        Branch(Boolean b) {
            super(srcSp.estimateSize(), srcSp.characteristics());
            type=b;
        }
        public boolean tryAdvance(Consumer<? super T> action) {
            if(deque.isEmpty() || ahead[0]==type) {
                if(!srcSp.tryAdvance(deque::push)) return false;
                ahead[0]=type;
                action.accept(deque.peek());
                return true;
            }
            action.accept(deque.removeLast());
            return true;
        }
    }
    return Arrays.asList(
        StreamSupport.stream(new Branch(true),  false),
        StreamSupport.stream(new Branch(false), false));
}
public static <T> Stream<T> applyUntil(
        Stream<T> in, Function<Stream<T>,Stream<T>> func, Predicate<Stream<T>> predicate) {
    List<Stream<T>> fork = fork(in);
    return predicate.test(fork.get(0))? fork.get(1):
        applyUntil(func.apply(fork.get(1)), func, predicate);
}

但正如所说,它只有在短路操作的小角落中对您有所帮助,除非您有非常昂贵的中间操作,否则它不会比重新应用第二种方法更快,如果您消除了将整个流收集到List的初始过程。

这基本上是我正在寻找的解决方案,谢谢。但还有一些问题:我是否需要synchronize tryAdvance?并且:当流不再请求任何数据时,我是否可以捕获“我不再感兴趣”的信号?否则,尽管在返回的Stream上调用了终端操作,但此实现仍将存储所有内容。 - alex berne
1
由于这会创建两个独立的流,使它们线程安全比简单的synchronized语句要复杂得多。您必须确保对源、deque和ahead标记的访问始终由共享锁(this不合适)进行保护,而消费者的accept方法的执行应该在不持有锁的情况下发生。而且没有结束信号可以捕获,流不会报告那个。您可以使用关闭操作,但这需要您的应用程序代码显式关闭;流不会自动执行此操作。 - Holger

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接