Java 8中使用流(仅使用流)的流式笛卡尔积

12

我想创建一个方法,用于生成由多个给定流的笛卡尔积元素组成的流(通过二元运算符汇总到相同类型)。请注意,参数和结果都是流,而不是集合。

例如,对于两个流{A,B}{X,Y},我希望它生成值流{AX,AY,BX,BY}(简单串联用于聚合字符串)。 到目前为止,我已经编写了以下代码:

private static <T> Stream<T> cartesian(BinaryOperator<T> aggregator, Stream<T>... streams) {
    Stream<T> result = null;

    for (Stream<T> stream : streams) {
        if (result == null) {
            result = stream;
        } else {
            result = result.flatMap(m -> stream.map(n -> aggregator.apply(m, n)));
        }
    }

    return result;
}

这是我想要的使用案例:

Stream<String> result = cartesian(
  (a, b) -> a + b, 
  Stream.of("A", "B"), 
  Stream.of("X", "Y")
);

System.out.println(result.collect(Collectors.toList()));

期望的结果:AX,AY,BX,BY

另一个例子:

Stream<String> result = cartesian(
  (a, b) -> a + b, 
  Stream.of("A", "B"), 
  Stream.of("K", "L"), 
  Stream.of("X", "Y")
);

期望结果:AKX、AKY、ALX、ALY、BKX、BKY、BLX、BLY

然而,当我运行代码时,出现以下错误:

IllegalStateException: 流已经被操作或关闭

流在哪里被消耗了?是通过flatMap吗?它可以容易地修复吗?


可能是如何使用Java 8流创建笛卡尔积?的重复问题。 - mkobit
@mkobit:它很相似,但我认为它不是重复的,因为这里你不是在参数中使用集合,而是流,这可能会导致不同的方法。 - voho
3个回答

12

在你的例子中传递流永远不比传递列表更好:

private static <T> Stream<T> cartesian(BinaryOperator<T> aggregator, List<T>... lists) {
    ...
}

并像这样使用它:

Stream<String> result = cartesian(
  (a, b) -> a + b, 
  Arrays.asList("A", "B"), 
  Arrays.asList("K", "L"), 
  Arrays.asList("X", "Y")
);

在这两种情况下,您都从varargs创建了一个隐式数组,并将其用作数据源,因此惰性是虚构的。您的数据实际上存储在数组中。

在大多数情况下,生成的笛卡尔积流比输入要长得多,因此实际上没有理由使输入变得惰性。例如,有五个包含五个元素的列表(共25个元素),则您将拥有3125个元素的结果流。因此,在内存中存储25个元素并不是非常大的问题。实际上,在大多数实际情况下,它们已经存储在内存中。

为了生成笛卡尔积流,您需要不断“倒带”所有流(除第一个流外)。要倒带,流应该能够一遍又一遍地重新检索原始数据,可以通过某种方式对它们进行缓冲(您不喜欢的方式)或者再次从来源(收集、数组、文件、网络、随机数等)抓取它们,并再次执行所有中间操作。如果您的来源和中间操作很慢,那么惰性解决方案可能比缓冲解决方案慢得多。如果您的来源无法再次生成数据(例如,无法产生与之前生成的相同数字的随机数生成器),则您的解决方案将是不正确的。

尽管完全惰性的解决方案是可能的。只需使用流供应商即可:

private static <T> Stream<T> cartesian(BinaryOperator<T> aggregator,
                                       Supplier<Stream<T>>... streams) {
    return Arrays.stream(streams)
        .reduce((s1, s2) -> 
            () -> s1.get().flatMap(t1 -> s2.get().map(t2 -> aggregator.apply(t1, t2))))
        .orElse(Stream::empty).get();
}
该解决方案有趣,因为我们创建和减少供应商流来获取最终的供应商,并最终调用它。用法:

该解决方案有趣,因为我们创建和减少供应商流来获取最终的供应商,并最终调用它。用法:

Stream<String> result = cartesian(
          (a, b) -> a + b, 
          () -> Stream.of("A", "B"), 
          () -> Stream.of("K", "L"), 
          () -> Stream.of("X", "Y")
        );
result.forEach(System.out::println);

谢谢你的好答案!我喜欢两个解决方案,你是正确的。在实际情况中,可能没有合理的例子可以优先选择输入流的方式。 - voho
我只是关心这里的效率。看起来你正在有效地创建一个嵌套的供应商堆栈,然后进行调用。 创建中间结构,例如List<T>...列表, 会比使用streams数组更好吗? - Roland

4

stream 在第二次迭代中在 flatMap 操作中被使用。因此,每次对结果进行 map 时都必须创建一个新的流。因此,您必须预先收集 stream,以获得在每次迭代中都有一个新的流。

private static <T> Stream<T> cartesian(BiFunction<T, T, T> aggregator, Stream<T>... streams) {
    Stream<T> result = null;
    for (Stream<T> stream : streams) {
        if (result == null) {
            result = stream;
        } else {
            Collection<T> s = stream.collect(Collectors.toList());
            result = result.flatMap(m -> s.stream().map(n -> aggregator.apply(m, n)));
        }
    }
    return result;
}

甚至更简洁:
private static <T> Stream<T> cartesian(BiFunction<T, T, T> aggregator, Stream<T>... streams) {
    return Arrays.stream(streams).reduce((r, s) -> {
        List<T> collect = s.collect(Collectors.toList());
        return r.flatMap(m -> collect.stream().map(n -> aggregator.apply(m, n)));
    }).orElse(Stream.empty());
}

1
非常感谢!你认为有没有不使用缓冲的方法来做到这一点? - voho
@voho 我认为这是不可能的。 - Flown
我发现奇怪的是这个可以运行:Collection<T> s = stream.collect(Collectors.toList()); result = result.flatMap(m -> s.stream().map(n -> aggregator.apply(m, n))); 但这个却不行:Stream<T> s = stream.collect(Collectors.toList()).stream(); result = result.flatMap(m -> s.map(n -> aggregator.apply(m, n))); - 虽然它们是一样的?! - voho
好的,我会将您的答案标记为正确,因为它实现了我的需求。现在我想知道是否有人提出不需要集合的解决方案... - voho
2
它与语句中的收集器不兼容,因为您将同一流连接到多个“collect”终端操作,这是不允许的。 这就是实际问题。 - Flown

2
你可以创建一个方法,返回一个对象的List<T>,而不对它们进行聚合。算法相同:在每个步骤中,将第二个流的元素收集到一个列表中,然后将它们附加到第一个流的元素上。
聚合器在方法外部。
@SuppressWarnings("unchecked")
public static <T> Stream<List<T>> cartesianProduct(Stream<T>... streams) {
    // incorrect incoming data
    if (streams == null) return Stream.empty();
    return Arrays.stream(streams)
            // non-null streams
            .filter(Objects::nonNull)
            // represent each list element as SingletonList<Object>
            .map(stream -> stream.map(Collections::singletonList))
            // summation of pairs of inner lists
            .reduce((stream1, stream2) -> {
                // list of lists from second stream
                List<List<T>> list2 = stream2.collect(Collectors.toList());
                // append to the first stream
                return stream1.flatMap(inner1 -> list2.stream()
                        // combinations of inner lists
                        .map(inner2 -> {
                            List<T> list = new ArrayList<>();
                            list.addAll(inner1);
                            list.addAll(inner2);
                            return list;
                        }));
            }).orElse(Stream.empty());
}

public static void main(String[] args) {
    Stream<String> stream1 = Stream.of("A", "B");
    Stream<String> stream2 = Stream.of("K", "L");
    Stream<String> stream3 = Stream.of("X", "Y");
    @SuppressWarnings("unchecked")
    Stream<List<String>> stream4 = cartesianProduct(stream1, stream2, stream3);
    // output
    stream4.map(list -> String.join("", list)).forEach(System.out::println);
}

String.join在这种情况下是一种聚合器。

输出:

AKX
AKY
ALX
ALY
BKX
BKY
BLX
BLY

另请参阅:如何将多个流的笛卡尔积转换为列表中的每个元素?


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接