从流中收集连续的一对数据

120

给定一个流,例如{ 0, 1, 2, 3, 4 }

我如何最优雅地将其转换为给定的形式:

{ new Pair(0, 1), new Pair(1, 2), new Pair(2, 3), new Pair(3, 4) }

(当然,假设我已经定义了类Pair)?

编辑: 这不仅仅适用于int或原始流。答案应该适用于任何类型的流。


2
FP术语是“partition”,但我在Java中没有找到具有所需语义的方法。它在谓词上进行分区。 - Marko Topolnik
1
通常在JDK 8中,Spliterator被认为是用于遍历和分区的目的。我将尝试提供一个示例。 - Olimpiu POP
list.stream().map(i -> new Pair(i, i+1)); - aepurniet
2
有关非流问题的等效问题,请参阅 https://dev59.com/03TYa4cB1Zd3GeqPqAyL - Raedwald
顺便说一下,有些人使用 Map.Entry 的任何一种实现作为 Pair 类。(尽管有些人可能认为这是一个 hack,但使用内置类很方便。) - Basil Bourque
22个回答

84

Java 8的流库主要用于将流分成较小的块以进行并行处理,因此有状态的管道阶段受到了相当大的限制,不能做像获取当前流元素的索引和访问相邻流元素之类的事情。

通常解决这些问题的一种方式(当然有一些限制)是通过索引驱动流,并依赖于某些随机访问数据结构(如ArrayList)来处理正在处理的值。如果值在arrayList中,则可以通过执行以下操作按请求生成对:

    IntStream.range(1, arrayList.size())
             .mapToObj(i -> new Pair(arrayList.get(i-1), arrayList.get(i)))
             .forEach(System.out::println);

当然,这种方法的限制是输入不能是无限流。但是这个流水线可以并行运行。


5
“输入不能是无限流。” 实际上,输入根本不能是流。 输入(arrayList)实际上是一种集合,这就是为什么我没有将其标记为答案的原因。(但恭喜您获得金徽章!) - Aleksandr Dubinsky
在流中是否有一种条件跳过下一个迭代的方法,即将forEach或mapToObj索引递增到i+2而不是i+1?这是否不推荐在Java中使用流或函数式编程的用例? - Stacky

39

我提供的StreamEx库扩展了标准流,为所有流类型提供了pairMap方法。对于原始流,它不会更改流类型,但可以用于进行一些计算。最常见的用法是计算差异:

int[] pairwiseDiffs = IntStreamEx.of(input).pairMap((a, b) -> (b-a)).toArray();

对于对象流,您可以创建任何其他对象类型。我的库不提供任何新的用户可见数据结构,如 Pair(这是库概念的一部分)。但是,如果您有自己的 Pair 类并希望使用它,可以执行以下操作:

Stream<Pair> pairs = IntStreamEx.of(input).boxed().pairMap(Pair::new);

或者如果您已经有一些Stream

Stream<Pair> pairs = StreamEx.of(stream).pairMap(Pair::new);

这个功能是使用自定义分割器实现的。它的开销非常低,并且可以很好地并行化。当然,它适用于任何流源,而不仅仅是像许多其他解决方案那样的随机访问列表/数组。在许多测试中,它表现得非常出色。这里有一个JMH基准测试,在这个测试中,我们使用不同的方法查找所有输入值之前的较大值(请参见问题)。


谢谢!我越学习这个库,就越喜欢它。我可能会开始使用流了。(StreamEx实现了Iterable!万岁!) - Aleksandr Dubinsky
为了使您的答案完整无误,您能展示如何将一个 Stream 包装成一个 StreamEx 吗? - Aleksandr Dubinsky
3
请使用StreamEx.of(stream)。还有其他方便的静态方法可以从Collection、数组、Reader等创建流。已编辑答案。 - Tagir Valeev
@TagirValeev,pairMap在顺序流上有序吗?实际上,我想要一个forPairsOrdered()方法,但是由于没有这样的方法,我能否以某种方式模拟它?是stream.ordered().forPairs()还是stream().pairMap().forEachOrdered() - Askar Kalykov
1
@AskarKalykov,pairMap是具有非干扰无状态映射器函数的中间操作,其排序方式与简单的map相同。forPairs按规定是无序的,但对于顺序流而言,无序操作实际上是有序的。如果您将原始问题阐述为单独的stackoverflow问题以提供更多上下文,那将是很好的。 - Tagir Valeev
@TagirValeev,你的StreamEx能否像MapUtils.invertMap一样实现反转映射? - Nicolazz92

23
您可以使用Stream.reduce()方法来实现这一点(我没有看到其他答案使用此技术)。
public static <T> List<Pair<T, T>> consecutive(List<T> list) {
    List<Pair<T, T>> pairs = new LinkedList<>();
    list.stream().reduce((a, b) -> {
        pairs.add(new Pair<>(a, b));
        return b;
    });
    return pairs;
}

1
它会返回(1,2) (2,3),而不是(1,2) (3,4)。此外,我不确定它是否按顺序应用(当然没有保证)。 - Aleksandr Dubinsky
3
请检查问题,这是预期的行为。@Aleksandr Dubinsky - SamTebbs33
11
啊,是的,抱歉。想不到我写的就是这个。 - Aleksandr Dubinsky
2
这是一个非常聪明的想法!我唯一看到这种方法存在的问题是,缩减函数不是纯粹的(它依赖于外部的pairs对象)。因此,如果并发运行,其语义正确性不能得到保证。一个可能的解决方案是使用线程安全的数据结构,比如Vector - Aldan Creo
我喜欢这个想法,除了构建一个新列表是必要的因为reduce是一个终端操作。应该可以懒惰地完成这件事。 - Roger Keays

18

这不是优雅的解决方案,而是一个巧妙的应急之策,但适用于无限流。

Stream<Pair> pairStream = Stream.iterate(0, (i) -> i + 1).map( // natural numbers
    new Function<Integer, Pair>() {
        Integer previous;

        @Override
        public Pair apply(Integer integer) {
            Pair pair = null;
            if (previous != null) pair = new Pair(previous, integer);
            previous = integer;
            return pair;
        }
    }).skip(1); // drop first null

现在您可以将流限制为所需长度

pairStream.limit(1_000_000).forEach(i -> System.out.println(i));

附言:我希望有更好的解决方案,类似于Clojure中的(partition 2 1 stream)


6
感谢指出匿名类是有时作为 Lambda 的有用替代品。 - Aleksandr Dubinsky
17
这与流框架的设计完全相悖,直接违反了映射API的约定,因为匿名函数不是无状态的。如果使用并行流和更多数据运行此代码,使得流框架创建更多工作线程,你将看到结果:偶尔出现的随机“错误”,几乎不可能重现,并且在拥有足够数据之前(在生产环境中)很难检测到。这可能会造成灾难性后果。 - Mario Rossi
3
Streams框架的存在并不仅仅是为了编写并行代码。不幸的是,它的用途有两面性,许多程序员使用它来编写顺序代码。甚至还有一些内置方法无法并行化(例如 skip)。@MarioRossi - Aleksandr Dubinsky
4
@AleksandrDubinsky,你关于limit/skip可并行化的说法是不正确的;JDK提供的实现实际上可以并行工作。由于操作与遭遇顺序相关联,因此并行化可能并不总是会提供性能优势,但在高质量场景下,它是有用的。 - Brian Goetz
4
错误。如果流是无序的(没有定义的遇到顺序,因此逻辑上不存在“第一个”或“第n个”元素,只有元素),它可能会跳过一个随机元素。但是,无论流是有序还是无序,跳过操作始终能够并行工作。如果流是有序的,则可以提取的并行性较少,但仍然是并行的。 - Brian Goetz
显示剩余5条评论

17

我实现了一个Spliterator包装器,它从原始Spliterator中获取每个n个元素T,并生成List<T>

public class ConsecutiveSpliterator<T> implements Spliterator<List<T>> {

    private final Spliterator<T> wrappedSpliterator;

    private final int n;

    private final Deque<T> deque;

    private final Consumer<T> dequeConsumer;

    public ConsecutiveSpliterator(Spliterator<T> wrappedSpliterator, int n) {
        this.wrappedSpliterator = wrappedSpliterator;
        this.n = n;
        this.deque = new ArrayDeque<>();
        this.dequeConsumer = deque::addLast;
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<T>> action) {
        deque.pollFirst();
        fillDeque();
        if (deque.size() == n) {
            List<T> list = new ArrayList<>(deque);
            action.accept(list);
            return true;
        } else {
            return false;
        }
    }

    private void fillDeque() {
        while (deque.size() < n && wrappedSpliterator.tryAdvance(dequeConsumer))
            ;
    }

    @Override
    public Spliterator<List<T>> trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return wrappedSpliterator.estimateSize();
    }

    @Override
    public int characteristics() {
        return wrappedSpliterator.characteristics();
    }
}

以下方法可用于创建连续的流:
public <E> Stream<List<E>> consecutiveStream(Stream<E> stream, int n) {
    Spliterator<E> spliterator = stream.spliterator();
    Spliterator<List<E>> wrapper = new ConsecutiveSpliterator<>(spliterator, n);
    return StreamSupport.stream(wrapper, false);
}

示例用法:

consecutiveStream(Stream.of(0, 1, 2, 3, 4, 5), 2)
    .map(list -> new Pair(list.get(0), list.get(1)))
    .forEach(System.out::println);

不是的。它创建一个包含List<E>元素的新流。每个列表都包含原始流中的n个连续元素。自己检查一下 ;) - Tomek Rękawek
你能否修改你的答案,使得除了第一个和最后一个元素之外的每个元素都被重复一遍? - Aleksandr Dubinsky
抱歉,我没有注意到元素应该被重复。我已经修正了我的解决方案。 - Tomek Rękawek
4
我认为这是一项出色的工作,并且应该推广到除了分区大小之外的任何步长。现在有很多需要“(分区大小步长)”函数的地方,而这是得到它的最佳方法。 - Marko Topolnik
3
考虑使用 ArrayDeque 以提高性能,而不是 LinkedList - Marko Topolnik
显示剩余3条评论

8
你可以在cyclops-react(我为这个库做贡献)中使用滑动操作符来实现此操作。
  LazyFutureStream.of( 0, 1, 2, 3, 4 )
                  .sliding(2)
                  .map(Pair::new);

或者

   ReactiveSeq.of( 0, 1, 2, 3, 4 )
                  .sliding(2)
                  .map(Pair::new);

假设Pair构造函数可以接受包含2个元素的集合。
如果您想按4进行分组,并增加2,这也是支持的。
     ReactiveSeq.rangeLong( 0L,Long.MAX_VALUE)
                .sliding(4,2)
                .forEach(System.out::println);

在cyclops-streams StreamUtils类中,还提供了用于创建滑动视图的等效静态方法,适用于java.util.stream.Stream。请注意,保留HTML标签,但不要写出解释。
       StreamUtils.sliding(Stream.of(1,2,3,4),2)
                  .map(Pair::new);

注意:对于单线程操作,ReactiveSeq可能更合适。LazyFutureStream扩展了ReactiveSeq,但主要是针对并发/并行使用(它是一个Future的流)。
LazyFutureStream扩展了ReactiveSeq,而ReactiveSeq又扩展了来自jOOλ的Seq(它又扩展了java.util.stream.Stream),因此Lukas提出的解决方案也适用于任何一种流类型。对于任何对窗口/滑动操作符感兴趣的人,主要区别在于明显的权力/复杂性权衡和适用于与无限流一起使用的能力(滑动不会消耗流,但会在流中缓存)。

这样你会得到[(0,1)(2,3) ...],但问题要求的是[(0,1)(1,2) ...]。请看我的RxJava答案... - frhack
1
你说得对,是我的错,我误读了问题 - 滑动运算符是在这里使用的正确选项。我会更新我的答案 - 谢谢! - John McClean

6

Streams.zip(..)在Guava中可用,供那些依赖它的人使用。

示例:

Streams.zip(list.stream(),
            list.stream().skip(1),
            (a, b) -> System.out.printf("%s %s\n", a, b));

5

proton-pack库提供了窗口函数的功能。如果给定一个Pair类和一个Stream,可以像这样实现:

Stream<Integer> st = Stream.iterate(0 , x -> x + 1);
Stream<Pair<Integer, Integer>> pairs = StreamUtils.windowed(st, 2, 1)
                                                  .map(l -> new Pair<>(l.get(0), l.get(1)))
                                                  .moreStreamOps(...);

现在,pairs 流包含:
(0, 1)
(1, 2)
(2, 3)
(3, 4)
(4, ...) and so on

然而,看起来你需要创建两次 st!这个库能否使用单个流解决问题? - Aleksandr Dubinsky
@AleksandrDubinsky 我不认为当前的分割器已经提供了这个功能。我已经提交了一个问题 https://github.com/poetix/protonpack/issues/9 - Alexis C.
@AleksandrDubinsky windowed 功能已经添加!请查看编辑。 - Alexis C.
1
为什么不删除你的旧答案,这样其他用户就可以看到解决方案,而不是历史记录。 - Aleksandr Dubinsky

5

查找连续的一对

如果您愿意使用第三方库并且不需要并行处理,那么jOOλ提供了以下SQL风格的窗口函数:

System.out.println(
Seq.of(0, 1, 2, 3, 4)
   .window()
   .filter(w -> w.lead().isPresent())
   .map(w -> tuple(w.value(), w.lead().get())) // alternatively, use your new Pair() class
   .toList()
);

产生收益

[(0, 1), (1, 2), (2, 3), (3, 4)]

lead()函数从窗口中按遍历顺序访问下一个值。

查找连续的三元组/四元组/n元组

评论中有一个问题询问更一般的解决方案,需要收集n元组(或可能是列表)。因此,这里提供另一种方法:

int n = 3;

System.out.println(
Seq.of(0, 1, 2, 3, 4)
   .window(0, n - 1)
   .filter(w -> w.count() == n)
   .map(w -> w.window().toList())
   .toList()
);

产生一个列表的列表
[[0, 1, 2], [1, 2, 3], [2, 3, 4]]

如果没有 filter(w -> w.count() == n) 这一步过滤,结果会是:

[[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4], [4]]

免责声明:我在jOOλ背后的公司工作。

有趣。如果我需要将3个或更多元素分组怎么办?使用w.lead().lead()吗? - Raul Santelices
1
@RaulSantelices:tuple(w.value(), w.lead(1), w.lead(2)) 是一个选项。我已经更新了我的答案,提供了更通用的 length = n 解决方案。 - Lukas Eder
1
我理解得正确吗?.window()不是一种惰性操作,它会将整个输入流收集到某个中间集合中,然后从中创建一个新的流? - Tagir Valeev
@TagirValeev:是的,这就是当前的实现。在上面的情况中(没有使用Comparator重新排序窗口),那么像这样的优化是可能的,并且很可能会在未来实现。 - Lukas Eder

3

这个操作本质上是有状态的,因此并不是流(streams)所要解决的问题 - 请参见 javadoc 中的“无状态行为”部分:

最好完全避免使用带有状态的行为参数来进行流操作。

在这里的一个解决方案是通过外部计数器引入流中的状态,虽然它只能用于顺序流。

public static void main(String[] args) {
    Stream<String> strings = Stream.of("a", "b", "c", "c");
    AtomicReference<String> previous = new AtomicReference<>();
    List<Pair> collect = strings.map(n -> {
                            String p = previous.getAndSet(n);
                            return p == null ? null : new Pair(p, n);
                        })
                        .filter(p -> p != null)
                        .collect(toList());
    System.out.println(collect);
}


static class Pair<T> {
    private T left, right;
    Pair(T left, T right) { this.left = left; this.right = right; }
    @Override public String toString() { return "{" + left + "," + right + '}'; }
}

该问题要求收集输入流的连续元素,而不仅仅是收集连续整数。术语上的一个重要澄清:Stream !=“lambdas”。 - Aleksandr Dubinsky
你可以用AtomicReference来替代AtomicInteger。另一种选择是自己编写收集器,或者使用外部库,比如这个例子:https://dev59.com/Zl0a5IYBdhLWcg3wqKGq#30090528。 - assylias
请看我的编辑。另外,我不确定您对lambda!= stream的评论是否理解。使用匿名类的另一个答案本质上做了相同的事情,只是状态由匿名类持有而不是外部... - assylias
1
那很好。StreamEx 库也是一个不错的发现,本身可能就是一个答案。我对 "streams != lambdas" 的评论是指你所说的 "该操作基本上是有状态的,因此不是 lambda 所要解决的问题。" 我认为你意思想用 "streams" 这个词。 - Aleksandr Dubinsky
哦,我明白了 - 我已经澄清了。 - assylias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接