将Java flatmap的Iterator<Pair<Stream<A>, Stream<B>>>转换为Pair<Stream<A>, Stream<B>>

14
我尝试实现一个具有以下签名的方法:

```

public static <A,B> Pair<Stream<A>, Stream<B>> flatten(Iterator<Pair<Stream<A>, Stream<B>>> iterator);

该方法的目标是将每个流类型压平为单个流并将输出包装在一对中。我只有一个Iterator(而不是Iterable),并且不能更改方法签名,因此必须在单次迭代中执行压平操作。

我当前最好的实现是

public static <A,B> Pair<Stream<A>, Stream<B>> flatten(Iterator<Pair<Stream<A>, Stream<B>> iterator) {
    Stream<A> aStream = Stream.empty();
    Stream<B> bStream = Stream.empty();
    while(iterator.hasNext()) {
        Pair<Stream<A>, Stream<B>> elm = iterator.next();
        aStream = Stream.concat(aStream, elm.first);
        bStream = Stream.concat(bStream, elm.second);
    }
    return Pair.of(aStream, bStream);
}

但虽然这在技术上是正确的,但我有两个原因不太满意:

  1. Stream.concat 警告不要做这种事情,因为它可能会导致 StackOverflowError
  2. 从风格上讲,如果可能的话,我宁愿它是纯函数式的,而不是不得不循环遍历迭代器并重新分配流。

感觉 Stream#flatMap 在这里应该很合适(在使用 Guava's Streams.stream(Iterator) 将输入 Iterator 转换为 Stream 后),但似乎由于中间的 Pair 类型而无法工作。

另一个要求是任何迭代器/流都可能非常大(输入可能包含从一对非常大的流到许多单项流的任何位置),因此解决方案理想情况下不应将结果收集到内存中的集合中。

3个回答

11

好的,番石榴的Streams.stream并不是什么魔法,实际上内部实现只是:

StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false);

也许你不需要将它与你的方法进行链接,而是可以直接使用它。

而你可以使用Stream.Builder来实现这一点:

public static <A, B> Pair<Stream<A>, Stream<B>> flatten(Iterator<Pair<Stream<A>, Stream<B>>> iterator) {

    Stream.Builder<Stream<A>> builderA = Stream.builder();
    Stream.Builder<Stream<B>> builderB = Stream.builder();

    iterator.forEachRemaining(pair -> {
        builderA.add(pair.first);
        builderB.add(pair.second);
    });

    return Pair.of(builderA.build().flatMap(Function.identity()), builderB.build().flatMap(Function.identity()));
}

1
你可以用 iterator.forEachRemaining() 来替换 StreamSupport.stream(…).forEach() - Didier L
1
@DidierL 好观点,谢谢;顺便提一下,这仍然收集到一个有序的集合(几乎像您示例中的List),但它是非公共的SpinnedBuffer - Eugene
Spined”而不是“Spinned”。由于形式,当二维数组在内部使用时,这比ArrayList稍微更有效率,因为它不需要在增加数组容量操作时进行数据复制。 - Holger
@Holger,我只是在工作中听说这比ArrayList更有效率,但我没有看过代码,所以我无法确定我是否会有时间去研究它。感谢您至少给了一些提示。 - Eugene
1
很简单。ArrayList有一个一维数组,所以当其容量用尽时,将分配一个新的更大的数组并复制内容。SpinedBuffer有一个数组的数组,最初未填充。当当前目标数组已满时,它将存储在数组的数组中,并分配一个新的更大的数组作为新的目标数组,但不进行复制。这种设计会使插入或删除变得复杂,但这些操作也不受支持。但是,在这个已知大小的二维数组上进行流处理仍然很快。 - Holger

7
避免收集整个Iterator(就像你在问题中实际做的那样)相当困难,因为您不知道生成的流将如何被消耗:一个可能完全被消耗,需要完全消耗迭代器,而另一个则根本没有被消耗,需要跟踪所有产生的对-有效地将它们收集到某个地方。
仅当流以更或多或少的“速度”被消耗时,您才能从不收集整个迭代器中受益。但这种消耗意味着要么使用其中一个结果流的迭代器,要么在并行线程中消耗流-这将需要额外的同步。
因此,我建议将所有对收集到List中,然后从该列表生成新的Pair
public static <A,B> Pair<Stream<A>, Stream<B>> flatten(Iterator<Pair<Stream<A>, Stream<B>>> iterator) {
    Iterable<Pair<Stream<A>, Stream<B>>> iterable = () -> iterator;
    final List<Pair<Stream<A>, Stream<B>>> allPairs =
        StreamSupport.stream(iterable.spliterator(), false)
            .collect(Collectors.toList());

    return Pair.of(
            allPairs.stream().flatMap(p -> p.first),
            allPairs.stream().flatMap(p -> p.second)
    );
}

这不会消耗任何原始流,同时保持简单的解决方案,避免嵌套流连接。

3

首先,这将是您所说的更偏向于函数式编程风格的代码的“更加功能化”版本:

<A, B> Pair<Stream<A>, Stream<B>> flattenFunctional(Iterator<Pair<Stream<A>, Stream<B>>> iterator) {
    return Streams.stream(iterator)
        .reduce(Pair.of(Stream.empty(), Stream.empty()),
            (a, b) -> Pair.of(
                Stream.concat(a.first, b.first),
                Stream.concat(a.second, b.second)));
}

关于可能出现的StackOverflowError警告仍然适用于此处,因为使用了Stream.concat。
为避免这种情况并考虑大型数据集的性能和内存使用,我有以下建议(完全不具备功能)。您可以创建一对自定义迭代器(针对A、B类型),并使用Guava的Streams.stream()获取一对流。将这些自定义迭代器放在一个带有一对迭代器堆栈的类中。例如,在第一对迭代器中,如果Stream的元素少于Stream,则在Stream耗尽后,调用iterator.next()并将B的迭代器推入其堆栈。以下是带有一对堆栈的类(添加构造函数):
class PairStreamIterator<A, B> {
    private final Iterator<Pair<Stream<A>, Stream<B>>> iterator;
    private final Queue<Iterator<A>> stackA = new ArrayDeque<>();
    private final Queue<Iterator<B>> stackB = new ArrayDeque<>();

    Iterator<A> getItA() {
        return new Iterator<A>() {
            @Override public boolean hasNext() {
                if (!stackA.isEmpty() && !stackA.peek().hasNext()) {
                    stackA.remove();
                    return hasNext();
                } else if (!stackA.isEmpty() && stackA.peek().hasNext()) {
                    return true;
                } else if (iterator.hasNext()) {
                    Pair<Stream<A>, Stream<B>> pair = iterator.next();
                    stackA.add(pair.first.iterator());
                    stackB.add(pair.second.iterator());
                    return hasNext();
                }
                return false;
            }

            @Override public A next() {
                return stackA.peek().next();
            }
        };
    }    
    // repeat for Iterator<B>
}

还有flatten方法:

<A, B> Pair<Stream<A>, Stream<B>> flattenIt(Iterator<Pair<Stream<A>, Stream<B>>> iterator) {
    final PairStreamIterator<A, B> pair = new PairStreamIterator<>(iterator);
    return Pair.of(Streams.stream(pair.getItA()), Streams.stream(pair.getItB()));
}

如果您以相同的速率消耗flatten结果对中的2个流,则2个堆栈通常会容纳1或2个迭代器。最坏的情况是,如果您计划完全消耗结果对中的一个流,然后再消耗另一个流。在这种情况下,第二个被展平流所需的所有迭代器都将保留在迭代器的堆栈中。恐怕我认为没有任何绕过这个问题的方法。由于这些存储在内存中的堆上,您不会遇到StackOverflowError,尽管您仍然可能会遇到OutOfMemoryError

hasNext中递归使用可能存在潜在问题。只有当您在输入中遇到许多连续的空流时,才会出现问题。


我认为最好使用Queue<Stream<A/B>>,以避免获取每个传入Stream的迭代器。然后你将拥有Iterator<Stream<A/B>> getItA/B(),但这不会成为问题,因为你只需要从那些迭代器创建的流中fratMap(identity())。它们的实现可能也会更简单,因为您不需要在hasNext()中进行remove()和递归:它只是!stackA/B.isEmpty() || iterator.hasNext()。整个过程仍然需要同步。 - Didier L

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接