Java 8流的条件处理

39

我希望将一个流分成两个或更多子流,并以不同的方式处理元素。例如,一个(大型)文本文件可能包含A类型和B类型的行,因此我想做类似以下操作:

File.lines(path)
.filter(line -> isTypeA(line))
.forEachTrue(line -> processTypeA(line))
.forEachFalse(line -> processTypeB(line))

以上是我尝试对情况进行抽象的结果。实际上,我有一个非常大的文本文件,每一行都会使用正则表达式进行测试;如果通过了,则进行处理,而如果被拒绝,则我想更新一个计数器。这种对被拒绝字符串的进一步处理是我不能简单地使用过滤器的原因。

是否有任何合理的方法可以使用流来完成此操作,还是必须回退到循环中?(我希望这也能并行运行,所以流是我的首选)。


4
你可以使用 partitioningBy 收集器,但需要一个临时 map 数据持有者。 - Tunaki
8个回答

27

Java 8的流并不支持这种操作。来自JDK

一个流只能被操作一次(调用中间或终端流操作)。这排除了例如“分叉”流的情况,其中相同的源为两个或多个管道提供输入,或者对同一流进行多次遍历。

如果你可以将其存储在内存中,则可以使用Collectors.partitioningBy,如果只有两种类型,则可以使用Map<Boolean, List>。否则,请使用Collectors.groupingBy


24
只需要测试每个元素,并根据结果进行操作。
lines.forEach(line -> {
    if (isTypeA(line)) processTypeA(line);
    else processTypeB(line);
});

这种行为可以隐藏在一个辅助方法中:
public static <T> Consumer<T> branch(Predicate<? super T> test, 
                                     Consumer<? super T> t, 
                                     Consumer<? super T> f) {
    return o -> {
        if (test.test(o)) t.accept(o);
        else f.accept(o);
    };
}

然后使用方法将如下所示:
lines.forEach(branch(this::isTypeA, this::processTypeA, this::processTypeB));

旁注

Files.lines() 方法不会关闭底层文件,因此必须按照以下方式使用:

try (Stream<String> lines = Files.lines(path, encoding)) {
  lines.forEach(...);
}

Stream类型的变量让我有些不安,所以我更喜欢直接管理BufferedReader

try (BufferedReader lines = Files.newBufferedReader(path, encoding)) {
    lines.lines().forEach(...);
}

这不会起作用,因为lambda主体应该是一个语句。你会在这里遇到编译器错误 - 布尔值无法转换为void。 - Shell Scott
1
@VolodymyrReda 谢谢!我已经更新了一个可行的解决方案。 - erickson

6

虽然在行为参数中使用副作用是不鼓励的,但只要没有干扰,它们并不被禁止,因此最简单的解决方案(虽然不是最干净的)是在过滤器中直接计数:

AtomicInteger rejected=new AtomicInteger();
Files.lines(path)
    .filter(line -> {
        boolean accepted=isTypeA(line);
        if(!accepted) rejected.incrementAndGet();
        return accepted;
})
// chain processing of matched lines

只要您处理所有项,结果就会保持一致。仅当您使用短路终端操作(在并行流中)时,结果才会变得不可预测。
更新原子变量可能不是最有效的解决方案,但在从文件处理行的上下文中,开销可能微不足道。
如果您想要一个干净、并行友好的解决方案,一个通用的方法是实现一个收集器,该收集器可以基于条件结合两个收集操作的处理。这需要您能够将下游操作表达为收集器,但大多数流操作都可以表达为收集器(而且趋势正在朝着可能表达所有操作的方式发展,即Java 9将添加当前缺失的过滤和平铺映射)。
您需要一种成对类型来保存两个结果,因此假设有一个草图。
class Pair<A,B> {
    final A a;
    final B b;
    Pair(A a, B b) {
        this.a=a;
        this.b=b;
    }
}

组合收集器的实现将如下所示:

public static <T, A1, A2, R1, R2> Collector<T, ?, Pair<R1,R2>> conditional(
        Predicate<? super T> predicate,
        Collector<T, A1, R1> whenTrue, Collector<T, A2, R2> whenFalse) {
    Supplier<A1> s1=whenTrue.supplier();
    Supplier<A2> s2=whenFalse.supplier();
    BiConsumer<A1, T> a1=whenTrue.accumulator();
    BiConsumer<A2, T> a2=whenFalse.accumulator();
    BinaryOperator<A1> c1=whenTrue.combiner();
    BinaryOperator<A2> c2=whenFalse.combiner();
    Function<A1,R1> f1=whenTrue.finisher();
    Function<A2,R2> f2=whenFalse.finisher();
    return Collector.of(
        ()->new Pair<>(s1.get(), s2.get()),
        (p,t)->{
            if(predicate.test(t)) a1.accept(p.a, t); else a2.accept(p.b, t);
        },
        (p1,p2)->new Pair<>(c1.apply(p1.a, p2.a), c2.apply(p1.b, p2.b)),
        p -> new Pair<>(f1.apply(p.a), f2.apply(p.b)));
}

比如,可以将匹配项收集到列表中并计算不匹配项的数量,像这样:

Pair<List<String>, Long> p = Files.lines(path)
  .collect(conditional(line -> isTypeA(line), Collectors.toList(), Collectors.counting()));
List<String> matching=p.a;
long nonMatching=p.b;

收集器对并行处理友好,并允许任意复杂的委托收集器,但请注意,使用当前实现,Files.lines 返回的流在并行处理时可能性能不佳,与 {{link1:“Reader#lines()由于其分割器中的不可配置批处理大小策略而无法并行化”}} 相比。改进计划在Java 9发布时进行。

3
这里有一种方法(虽然忽略了将条件处理强制变成流的警告),它将一个谓词和消费者包装成一个带副作用的谓词:
public static class StreamProc {

    public static <T> Predicate<T> process( Predicate<T> condition, Consumer<T> operation ) {
        Predicate<T> p = t -> { operation.accept(t); return false; };
        return (t) -> condition.test(t) ? p.test(t) : true;
    }

}

然后对流进行过滤:

someStream
    .filter( StreamProc.process( cond1, op1 ) )
    .filter( StreamProc.process( cond2, op2 ) )
    ...
    .collect( ... )

流中尚未处理完的元素。

例如,使用外部迭代进行典型文件系统遍历的方式如下:

File[] files = dir.listFiles();
for ( File f : files ) {
    if ( f.isDirectory() ) {
        this.processDir( f );
    } else if ( f.isFile() ) {
        this.processFile( f );
    } else {
        this.processErr( f );
    }
}

使用流和内部迭代,代码可以变为以下形式:
Arrays.stream( dir.listFiles() )
    .filter( StreamProc.process( f -> f.isDirectory(), this::processDir ) )
    .filter( StreamProc.process( f -> f.isFile(), this::processFile ) )
    .forEach( f -> this::processErr );

我希望Stream能够直接实现process方法,这样我们就能够...
Arrays.stream( dir.listFiles() )
    .process( f -> f.isDirectory(), this::processDir ) )
    .process( f -> f.isFile(), this::processFile ) )
    .forEach( f -> this::processErr );

有何想法?

3
我处理这个问题的方法并不是要将其分开,而是编写以下内容。
Files.lines(path)
   .map(line -> {
      if (condition(line)) {
        return doThingA(line);
      } else {
        return doThingB(line);
      }
   })...

具体细节取决于您想要做什么以及您计划如何实现它。


1
只有当doThing {A,B}实际上是函数时才可以这样做。如果意图是为类型A和B具有不同的副作用,则不应该使用此方法。 - Brian Goetz
1
@BrianGoetz 那我可能会使用 peek - Louis Wasserman

2

看起来实际上你想要处理每一行,但是根据某种条件(类型)以不同的方式进行处理。

我认为比较实用的实现方法是:

public static void main(String[] args) {
    Arrays.stream(new int[] {1,2,3,4}).map(i -> processor(i).get()).forEach(System.out::println);
}

static Supplier<Integer> processor(int i) {
    return tellType(i) ? () -> processTypeA(i) : () -> processTypeB(i);
}

static boolean tellType(int i) {
    return i % 2 == 0;
}

static int processTypeA(int i) {
    return i * 100;
}

static int processTypeB(int i) {
    return i * 10;
}

1

Well, you can simply do

Counter counter = new Counter();
File.lines(path)
    .forEach(line -> {
        if (isTypeA(line)) {
            processTypeA(line);
        }
        else {
            counter.increment();
        }
    });

这段代码的风格不太像函数式编程,但它以类似于您示例的方式完成任务。当然,如果并行执行,Counter.increment()processTypeA() 都必须是线程安全的。

0

@tom

这个怎么样:

Arrays.stream( dir.listFiles() )
    .peek(  f -> { if(f.isDirectory()) { processDir(f); }} )
    .peek(  f -> { if(f.isFile())      { processFile(f);}}) )
    .forEach( f -> this::processErr );

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接