如何交错(合并)两个Java 8 Streams?

16
 Stream<String> a = Stream.of("one", "three", "five");
 Stream<String> b = Stream.of("two", "four", "six");
我需要做些什么才能得到以下输出?
// one
// two
// three
// four
// five
// six

我研究了一下concat函数,但正如javadoc所解释的那样,它只是将一个字符串接在另一个字符串后面,而不会交错插入。

Stream<String> out = Stream.concat(a, b);
out.forEach(System.out::println);
创建一个延迟连接的流,其元素是第一个流的所有元素,后跟第二个流的所有元素。
 // one
 // three
 // five
 // two
 // four
 // six

如果我收集它们并进行迭代,我可以做到,但希望有更符合Java8的、流式的方法 :-)

注意

我不想压缩这些流

"zip"操作将从每个集合中获取一个元素并将它们组合。

zip操作的结果会像这样:(不需要的)

 // onetwo
 // threefour
 // fivesix

1
为什么zip不会保持相同的元素总数? - Ousmane D.
阅读其他线程,zip(压缩)总是需要一个拉链函数来将每个流的元素组合成一个新元素。我只想交替排列而不是压缩。 - Blundell
1
我理解你的观点,感谢澄清。使用上述重复项中的 zip 函数,可以执行 Stream<String> result = zip(a, b, (e, z) -> Stream.of(e, z)).flatMap(x -> x); 以获得所需的结果。 - Ousmane D.
2
对于未来来到这里的任何人,这是评论+重定向答案:https://gist.github.com/blundell/3f062b8ec55fd1906c68e6ec8d848683 - Blundell
1
我喜欢创建interleave方法,它本质上是将zip方法包装起来以提高可读性等方面的优化。我已经投票重新开放,所以你可以在这里发布而不是外部... - Ousmane D.
显示剩余2条评论
8个回答

15

我会使用类似这样的内容:

public static <T> Stream<T> interleave(Stream<? extends T> a, Stream<? extends T> b) {
    Spliterator<? extends T> spA = a.spliterator(), spB = b.spliterator();
    long s = spA.estimateSize() + spB.estimateSize();
    if(s < 0) s = Long.MAX_VALUE;
    int ch = spA.characteristics() & spB.characteristics()
           & (Spliterator.NONNULL|Spliterator.SIZED);
    ch |= Spliterator.ORDERED;

    return StreamSupport.stream(new Spliterators.AbstractSpliterator<T>(s, ch) {
        Spliterator<? extends T> sp1 = spA, sp2 = spB;

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            Spliterator<? extends T> sp = sp1;
            if(sp.tryAdvance(action)) {
                sp1 = sp2;
                sp2 = sp;
                return true;
            }
            return sp2.tryAdvance(action);
        }
    }, false);
}

尽可能保留输入流的特性,这样可以进行某些优化(例如用于count()toArray())。此外,即使输入流可能是无序的,它也会添加ORDERED以反映交错。

当一个流拥有比另一个流更多的元素时,剩余的元素将出现在末尾。


使用Stream<? extends T> a是否更好,因为这是一种更通用的解决方案?只是问一下,因为我在另一个答案这里中引用了它。 - Naman
1
@Naman 当然。只是在整个代码中都要使用“? extends”有点烦人。但是为了好的API,应该这样做。 - Holger

2
一种比Holger做的更简单的解决方案,但也许它能够满足你的需求:
private static <T> Stream<T> interleave(Stream<T> left, Stream<T> right) {
    Spliterator<T> splLeft = left.spliterator();
    Spliterator<T> splRight = right.spliterator();

    T[] single = (T[]) new Object[1];

    Stream.Builder<T> builder = Stream.builder();

    while (splRight.tryAdvance(x -> single[0] = x) && splLeft.tryAdvance(builder)) {
        builder.add(single[0]);
    }

    return builder.build();
}

这个不一致的情况包括了left的所有元素,当它比right有更多的元素时,但会删除right的元素,当它比left多时。你应该决定。要包含所有共同的元素,请使用do {} while(splLeft.tryAdvance(builder) && spRight.tryAdvance(builder));,然后再决定。如果您想在流具有不同大小时包含所有元素,请在循环后执行(splLeft.tryAdvance(builder)? splLeft: spRight).forEachRemaining(builder);。而且,Stream.Builder<T>已经方便地实现了Consumer<T> - Holger
@Holger 实际上我只想要常见的,但现在问题变得更糟了,因为 do {} while(splLeft.tryAdvance(builder) && spRight.tryAdvance(builder)); 仍会从 left 中取一个元素,所以它仍然不正确 :(. 我现在看到的更大的问题是,在我思考它时这是作弊的,Stream.Builder 仍然使用隐藏集合来收集元素... - Eugene
从左边取一个额外元素并不违反“交错”模式(ababa)。如果你不想这样,考虑跟踪计数并对生成的流应用limit。这比处理一个额外的存储操作要简单得多,特别是对于通用数组而言。构建器意味着一种存储方式,我以为你已经知道了,因为这是实现分裂迭代器的主要(唯一)缺点。 - Holger
为了挽救你的方法,不妨试试 for(List<T> tmp = new ArrayList<>(1); splRight.tryAdvance(tmp::add) && splLeft.tryAdvance(builder); tmp.clear()) tmp.forEach(builder); - Holger
1
@FedericoPeraltaSchaffner 是的,这就是关于内存集合的确切观点,总的来说,只需使用Holger放置的任何内容即可... - Eugene
显示剩余4条评论

2

从问题评论中可以看出,我尝试使用zip进行操作:

Stream<String> a = Stream.of("one", "three", "five");
Stream<String> b = Stream.of("two", "four", "six");

Stream<String> out = interleave(a, b);


    public static <T> Stream<T> interleave(Stream<T> streamA, Stream<T> streamB) {
        return zip(streamA, streamB, (o1, o2) -> Stream.of(o1, o2)).flatMap(s -> s);
    }

    /**
    * https://dev59.com/VGMm5IYBdhLWcg3wZ-XQ
    **/
    private static <A, B, C> Stream<C> zip(Stream<A> streamA, Stream<B> streamB, BiFunction<A, B, C> zipper) {
        final Iterator<A> iteratorA = streamA.iterator();
        final Iterator<B> iteratorB = streamB.iterator();
        final Iterator<C> iteratorC = new Iterator<C>() {
            @Override
            public boolean hasNext() {
                return iteratorA.hasNext() && iteratorB.hasNext();
            }

            @Override
            public C next() {
                return zipper.apply(iteratorA.next(), iteratorB.next());
            }
        };
        final boolean parallel = streamA.isParallel() || streamB.isParallel();
        return iteratorToFiniteStream(iteratorC, parallel);
    }

    private static <T> Stream<T> iteratorToFiniteStream(Iterator<T> iterator, boolean parallel) {
        final Iterable<T> iterable = () -> iterator;
        return StreamSupport.stream(iterable.spliterator(), parallel);
    }

1

不使用任何外部库(使用JDK11)

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class MergeUtil {

    private static <T> Stream<T> zipped(List<T> lista, List<T> listb) {
        int maxSize = Math.max(lista.size(), listb.size());
        final var listStream = IntStream
                .range(0, maxSize)
                .mapToObj(i -> {
                    List<T> result = new ArrayList<>(2);
                    if (i < lista.size()) result.add(lista.get(i));
                    if (i < listb.size()) result.add(listb.get(i));
                    return result;
                });
        return listStream.flatMap(List::stream);
    }

    public static void main(String[] args) {
        var l1 = List.of(1, 2, 3);
        var l2 = List.of(4, 5, 6, 7, 8, 9);
        final var zip = zipped(l1, l2);
        System.out.println(zip.collect(Collectors.toList()));
    }

}

listStream是一个返回扁平化后的Stream<List<A>>

结果为:[1, 4, 2, 5, 3, 6, 7, 8, 9]


1
这可能不是一个好的答案,因为
(1) 它会收集到映射中,我猜你不想这样做,
(2) 它不是完全无状态的,因为它使用了AtomicIntegers。

但仍然添加它,因为
(1) 它易于阅读,
(2) 社区可以从中获得灵感并尝试改进它。

Stream<String> a = Stream.of("one", "three", "five");
Stream<String> b = Stream.of("two", "four", "six");

AtomicInteger i = new AtomicInteger(0);
AtomicInteger j = new AtomicInteger(1);

Stream.of(a.collect(Collectors.toMap(o -> i.addAndGet(2), Function.identity())),
        b.collect(Collectors.toMap(o -> j.addAndGet(2), Function.identity())))
        .flatMap(m -> m.entrySet().stream())
        .sorted(Comparator.comparing(Map.Entry::getKey))
        .forEach(e -> System.out.println(e.getValue())); // or collect

输出

one
two
three
four
five
six

@Holger的编辑

Stream.concat(a.map(o -> new AbstractMap.SimpleEntry<>(i.addAndGet(2), o)),
        b.map(o -> new AbstractMap.SimpleEntry<>(j.addAndGet(2), o)))
        .sorted(Map.Entry.comparingByKey())
        .forEach(e -> System.out.println(e.getValue())); // or collect

你不需要收集到映射中,因为你只对获取条目流感兴趣,所以你可以简单地使用 Stream.concat( a.map(o -> new AbstractMap.SimpleEntry<>(i.addAndGet(2),o)), b.map(o -> new AbstractMap.SimpleEntry<>(j.addAndGet(2),o)) ) 来获取它。然后,你可以链式调用 .sorted(Map.Entry.comparingByKey())。但你是正确的,这种可变状态是不被鼓励的。最值得注意的是,它会在并行执行时出现问题。 - Holger
@Holger 谢谢,我已经将这个添加到答案中了。我之前也考虑过,但是找不到 EntrySet 构造函数,懒得去谷歌搜索如何创建一个 EntrySet :( - Kartik
它不是创建一个EntrySet,而只是一个Entry实例的流。现成的实现确实不容易找到(在AbstractMap中还有一个SimpleImmutableEntry)。从Java 9开始,您可以简单地使用Map.entry(key, value)来获取一个不可变的Entry实例,但您必须意识到它不支持null键或值,因此只能在您可以排除null时使用它。 - Holger

1
一种使用Iterator的解决方案。
final Iterator<String> iterA = a.iterator();
final Iterator<String> iterB = b.iterator();

final Iterator<String> iter = new Iterator<String>() {
  private final AtomicInteger idx = new AtomicInteger();
  @Override
  public boolean hasNext() { 
    return iterA.hasNext() || iterB.hasNext();
  }
  @Override
  public String next() {
    return idx.getAndIncrement() % 2 == 0 && iterA.hasNext() ? iterA.next() : iterB.next();
  }
};

 // Create target Stream with StreamEx from: https://github.com/amaembo/streamex    
 StreamEx.of(iter).forEach(System.out::println);

 // Or Streams from Google Guava
 Streams.stream(iter).forEach(System.out::println);

或者简单地使用我提供的abacus-common中的解决方案:

 AtomicInteger idx = new AtomicInteger();
 StreamEx.merge(a, b, (s1, s2) -> idx.getAndIncrement() % 2 == 0 ? Nth.FIRST : Nth.SECOND).forEach(Fn.println()); 

1
你不需要任何锤子。对于第一个流的每个元素,构建一个包含该元素和第二个流的元素(通过迭代器提取)的流,然后使用flatMap方法:
Stream<String> a = Stream.of("one", "three", "five");
Stream<String> b = Stream.of("two", "four", "six");
Iterator<String> bi = b.iterator();
a.flatMap( x -> Stream.of(x, bi.next()) ).forEach(System.out::println);

0

使用Guava的Streams.zipStream.flatMap

Stream<String> interleaved = Streams
        .zip(a, b, (x, y) -> Stream.of(x, y))
        .flatMap(Function.identity());

interleaved.forEach(System.out::println);

输出:

one
two
three
four
five
six

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接