你能将一个流分成两个流吗?

196

我有一个用Java 8流表示的数据集:

Stream<T> stream = ...;

我可以看到如何过滤它以获得随机子集 - 例如

Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();   
Stream<T> heads = stream.filter((x) -> (coin.nextInt() == 0));

我也可以看到如何将这个流减少,例如,得到代表数据集的两个随机一半的列表,然后将它们转换回流。 但是,有没有一种直接从初始流生成两个流的方法?类似于

(heads, tails) = stream.[some kind of split based on filter]

感谢任何见解。


2
Mark的回答比Louis的回答更有帮助,但我必须说Louis的回答更与原始问题相关。问题相当专注于将Stream转换为多个Stream 而不进行中间转换的可能性,尽管我认为达到这个问题的人实际上正在寻找实现此目标的方法,而不考虑这种限制,这是Mark的答案。这可能是因为标题中的问题与描述中的问题不同 - devildelta
11个回答

386

这可以使用一个收集器来完成。

  • 对于两个类别,使用Collectors.partitioningBy()工厂函数。

这将创建一个Map<Boolean, List>,并根据 Predicate 将项目放入其中一个列表。

注意:由于流需要完整消耗,因此无法在无限流上工作。而且由于流已经被消费,该方法只是把它们放在列表中,而不是创建一个新的带有内存的流。如果需要将列表作为输出,请随时对其进行流操作。

另外,在您提供的仅包含头信息的示例中也不需要迭代器。

  • 二分拆分的代码如下:
Random r = new Random();

Map<Boolean, List<String>> groups = stream
    .collect(Collectors.partitioningBy(x -> r.nextBoolean()));

System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());
  • 如果需要更多的分类,可以使用Collectors.groupingBy() 工厂方法。
Map<Object, List<String>> groups = stream
    .collect(Collectors.groupingBy(x -> r.nextInt(3)));
System.out.println(groups.get(0).size());
System.out.println(groups.get(1).size());
System.out.println(groups.get(2).size());

如果流不是Stream,而是原始流之一,比如IntStream,那么这个.collect(Collectors)方法就不可用了。你必须手动地完成,没有收集器工厂。它的实现看起来像这样:

[自 2020-04-16 起的示例 2.0]

    IntStream    intStream = IntStream.iterate(0, i -> i + 1).limit(100000).parallel();
    IntPredicate predicate = ignored -> r.nextBoolean();

    Map<Boolean, List<Integer>> groups = intStream.collect(
            () -> Map.of(false, new ArrayList<>(100000),
                         true , new ArrayList<>(100000)),
            (map, value) -> map.get(predicate.test(value)).add(value),
            (map1, map2) -> {
                map1.get(false).addAll(map2.get(false));
                map1.get(true ).addAll(map2.get(true ));
            });

在这个例子中,我使用初始集合的完整大小(如果有的话)来初始化ArrayLists。这可以防止甚至在最坏的情况下出现调整大小事件,但可能会耗费2NT空间(N = 初始元素数,T = 线程数)。为了在速度和空间之间取得平衡,您可以省略它或使用您最好的猜测,比如一个分区中预期的最高元素数量(通常略高于平衡分割的N/2)。
我希望我的做法不会冒犯任何人,因为我使用了Java 9的方法。对于Java 8版本,请查看编辑历史记录。

2
很好。然而,在并行流的情况下,IntStream 的最后一个解决方案不是线程安全的。解决方案比你想象的要简单得多... stream.boxed().collect(...);!它会按照所述进行操作:将原始的 IntStream 转换为装箱的 Stream<Integer> 版本。 - YoYo
40
这应该被接受作为答案,因为它直接解决了原帖的问题。 - ejel
33
我希望 Stack Overflow 能够让社区在找到更好的答案时覆盖已选择的答案。 - GuiSim
4
不确定这是否回答了问题。问题要求将流拆分为流,而不是列表。 - AlikElzin-kilaka
2
累加器函数过于冗长。你可以使用(map, x) -> map.get(p.test(x)).add(x)代替(map, x) -> { boolean partition = p.test(x); List<Integer> list = map.get(partition); list.add(x); }。此外,我不明白为什么collect操作不能是线程安全的。它的工作方式与Collectors.partitioningBy(p)非常接近,并且正常工作。但是,当不使用boxed()时,我会使用IntPredicate而不是Predicate<Integer>,以避免两次装箱。 - Holger
显示剩余3条评论

28

我自己遇到了这个问题,我认为分叉流有一些可以证明有效的用例。我编写了下面的代码作为消费者,以便它不做任何事情,但您可以将其应用于函数和其他任何可能遇到的内容。

class PredicateSplitterConsumer<T> implements Consumer<T>
{
  private Predicate<T> predicate;
  private Consumer<T>  positiveConsumer;
  private Consumer<T>  negativeConsumer;

  public PredicateSplitterConsumer(Predicate<T> predicate, Consumer<T> positive, Consumer<T> negative)
  {
    this.predicate = predicate;
    this.positiveConsumer = positive;
    this.negativeConsumer = negative;
  }

  @Override
  public void accept(T t)
  {
    if (predicate.test(t))
    {
      positiveConsumer.accept(t);
    }
    else
    {
      negativeConsumer.accept(t);
    }
  }
}

现在您的代码实现可能如下所示:
personsArray.forEach(
        new PredicateSplitterConsumer<>(
            person -> person.getDateOfBirth().isPresent(),
            person -> System.out.println(person.getName()),
            person -> System.out.println(person.getName() + " does not have Date of birth")));

20
很不幸,您所要求的做法直接违反了Java中流(Stream)文档的规定:

流应该只被操作一次(调用中间或终端的流操作)。这排除了例如“分叉”流的操作,其中相同的源提供了两个或更多的管道,或者对同一流进行多次遍历。

如果您真正想实现这种类型的行为,可以使用peek或其他方法来解决。在这种情况下,您应该复制您的流并适当地筛选每个副本,而不是尝试从同一原始流源备份两个流。

然而,您可能需要重新考虑是否使用Stream是您的最佳方案。


9
Javadoc 的措辞并不排除将流分成几个部分,只要一个流元素仅出现在其中的 一个 部分中。 - Thorbjørn Ravn Andersen
2
@ThorbjørnRavnAndersen 我不确定复制流项目是否是分叉流的主要障碍。主要问题在于分叉操作本质上是一个终端操作,因此当你决定进行分叉时,基本上是创建了某种集合。例如,我可以编写一个方法 List<Stream> forkStream(Stream s),但我的结果流至少部分由集合支持,而不是直接由底层流支持,与filter不同,后者不是终端流操作。 - Trevor Freeman
10
相比于https://github.com/ReactiveX/RxJava/wiki,这是我认为Java流略显不足的原因之一。流的重点是对潜在无限元素集应用操作,而现实世界的操作经常需要拆分、复制和合并流。 - Usman Ismail
1
@TrevorFreeman:为什么不可能呢?请参考 https://dev59.com/mmIj5IYBdhLWcg3wv3e2#66526781。 - serv-inc
1
@serv-inc 我不认为我说过任何事情是不可能的,但你链接的答案是关于收集流(和拆分收集器)的。如果我们只想要收集/终止流,那么解决方案就很简单了,困难(也许是不可能的,取决于你的定义)的部分实际上是在不终止/收集流的情况下拆分流。想象一下一个无限的数据流,它不断地到达,而你希望从这个单一的流中分割出两个实际的流对象... 这是一个难以解决的问题。 - Trevor Freeman
显示剩余3条评论

18
< p > < em >自从Java 12以来,您可以通过< code >teeing一次性获得两个< /em >< code >Stream < /code >。< br /> 在100次抛硬币中计算正面和反面的数量< /p >
Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();
List<Long> list = Stream.iterate(0, i -> coin.nextInt())
    .limit(100).collect(teeing(
        filtering(i -> i == 1, counting()),
        filtering(i -> i == 0, counting()),
        (heads, tails) -> {
          return(List.of(heads, tails));
        }));
System.err.println("heads:" + list.get(0) + " tails:" + list.get(1));

获取示例: 正面:51 反面:49


https://docs.oracle.com/en/java/javase/12/docs/api/java.base/java/util/stream/Collectors.html#teeing(java.util.stream.Collector,java.util.stream.Collector,java.util.function.BiFunction) - Matthew
1
这似乎是分割流的正确答案。 - cody.tv.weber

10
并非完全如此。一个流无法生成两个Stream,这是没有意义的——如果你需要在迭代一个流时同时生成另一个流,那么该怎么办呢?流只能被操作一次。
但是,如果你想将它们转储到列表或其他东西中,可以执行以下操作:
stream.forEach((x) -> ((x == 0) ? heads : tails).add(x));

82
为什么这没什么意义呢?因为一个流是一条管道,所以它无法创建两个原始流的生产者,我认为这可以通过提供两个流的收集器来解决。 - Brett Ryan
38
不安全的线程。尝试直接向集合中添加内容是错误的建议,这就是为什么我们有 stream.collect(...),它配备了预定义的线程安全的Collectors,即使在非线程安全的集合上也能很好地工作(没有同步锁争用)。@MarkJeronimus 给出了最佳答案。 - YoYo
1
@JoD 如果头和尾是线程安全的,则它是线程安全的。此外,假设使用非并行流,仅顺序不保证,因此它们是线程安全的。解决并发问题取决于程序员,因此如果集合是线程安全的,则此答案完全适用。 - Nicolas
1
@Nixon,现在有更好的解决方案,这种代码不太适合。使用这种代码可能会导致不良先例,使其他人以错误的方式使用它。即使没有使用并行流,也只有一步之遥。良好的编码实践要求我们在流操作期间不维护状态。接下来我们要做的是在像Apache Spark这样的框架中编写代码,同样的实践确实会导致意外的结果。这是一个创造性的解决方案,我承认,我自己不久前可能会写出这样的代码。 - YoYo
1
@JoD 这不是一个更好的解决方案,事实上它更加低效。这种思维方式最终会得出一个错误的结论,即所有的集合默认都应该是线程安全的,以防止意外后果。 - Nicolas
显示剩余3条评论

8
这与Stream的一般机制相违背。例如,你可以像你想要的那样将Stream S0拆分为Sa和Sb。在Sa上执行任何终端操作(如count())都将“消耗”S0中的所有元素。因此,Sb失去了其数据源。
以前,Stream有一个tee()方法,它将流复制到两个流中。但现在已经被移除了。
虽然Stream有一个peek()方法,但你可能可以使用它来实现你的要求。

1
“peek” 就是以前的 “tee”。 - Louis Wasserman
1
在Java 12中,Collectors新增了一个名为teeing()的方法,但是这个方法有点难以管理。可以参考这里的示例。 - Kaplan

6

不完全准确,但您可以通过调用Collectors.groupingBy()来完成需要的操作。您可以创建一个新的集合,然后在该新集合上实例化流。


2

这是我能想到的最好的答案了。

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

public class Test {

    public static <T, L, R> Pair<L, R> splitStream(Stream<T> inputStream, Predicate<T> predicate,
            Function<Stream<T>, L> trueStreamProcessor, Function<Stream<T>, R> falseStreamProcessor) {

        Map<Boolean, List<T>> partitioned = inputStream.collect(Collectors.partitioningBy(predicate));
        L trueResult = trueStreamProcessor.apply(partitioned.get(Boolean.TRUE).stream());
        R falseResult = falseStreamProcessor.apply(partitioned.get(Boolean.FALSE).stream());

        return new ImmutablePair<L, R>(trueResult, falseResult);
    }

    public static void main(String[] args) {

        Stream<Integer> stream = Stream.iterate(0, n -> n + 1).limit(10);

        Pair<List<Integer>, String> results = splitStream(stream,
                n -> n > 5,
                s -> s.filter(n -> n % 2 == 0).collect(Collectors.toList()),
                s -> s.map(n -> n.toString()).collect(Collectors.joining("|")));

        System.out.println(results);
    }

}

这个程序接收一串整数,并将它们在数字5处分割。对于大于5的数字,仅筛选偶数并将它们放入列表中。对于其余数字,使用 | 符号将它们连接起来。

输出结果:

 ([6, 8],0|1|2|3|4|5)

这并不是理想的做法,因为它会将所有内容收集到中间集合中,打破流程(而且参数太多了!)


2

我在寻找过滤流中特定元素并将其记录为错误的方法时,偶然发现了这个问题。因此,我并不需要太多地拆分流,只需要使用不显眼的语法将一个预期的终止操作附加到谓词上即可。以下是我想出来的代码:

public class MyProcess {
    /* Return a Predicate that performs a bail-out action on non-matching items. */
    private static <T> Predicate<T> withAltAction(Predicate<T> pred, Consumer<T> altAction) {
    return x -> {
        if (pred.test(x)) {
            return true;
        }
        altAction.accept(x);
        return false;
    };

    /* Example usage in non-trivial pipeline */
    public void processItems(Stream<Item> stream) {
        stream.filter(Objects::nonNull)
              .peek(this::logItem)
              .map(Item::getSubItems)
              .filter(withAltAction(SubItem::isValid,
                                    i -> logError(i, "Invalid")))
              .peek(this::logSubItem)
              .filter(withAltAction(i -> i.size() > 10,
                                    i -> logError(i, "Too large")))
              .map(SubItem::toDisplayItem)
              .forEach(this::display);
    }
}

0

使用 Lombok 的简短版本

import java.util.function.Consumer;
import java.util.function.Predicate;

import lombok.RequiredArgsConstructor;

/**
 * Forks a Stream using a Predicate into postive and negative outcomes.
 */
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PROTECTED)
public class StreamForkerUtil<T> implements Consumer<T> {
    Predicate<T> predicate;
    Consumer<T> positiveConsumer;
    Consumer<T> negativeConsumer;

    @Override
    public void accept(T t) {
        (predicate.test(t) ? positiveConsumer : negativeConsumer).accept(t);
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接