从流和元素生成流,Java 8。

3
我正在努力理解一些Java 8 Stream功能。我对FP有一定的了解,因为30年前写过一些Lisp,但我认为我可能在尝试做一些这个新设施并不真正针对的事情。无论如何,如果问题很愚蠢,我很乐意学习我的错误。
我将提供一个具体的问题,虽然实际上我正在尝试解决一个通用的概念。
假设我想从流中的每第三个元素获取一个流。在常规FP中,我会(大约)创建一个递归函数,通过将列表的第一个元素与删除两个元素后剩余列表的(调用自身)连接来操作。足够容易。但是要在流中执行此操作,我觉得我想要以下两种工具之一:
1)一种操作可以从流中提取多个项目进行处理(然后我只需获取三个,使用第一个,然后丢弃其余部分)
2)一种制作以项和流为参数并创建流的供应商的方法。然后感觉我可以从第一个项目和缩短的流中创建下游流,尽管对我来说仍不清楚这是否会执行必要的递归魔法以实际工作。
开始编辑
所以,有一些有趣和有用的反馈;谢谢大家。特别是,评论帮助我更好地澄清了我的头脑正在尝试解决的问题。
首先,可以 - 至少在概念上 - 知道/需要知道序列中的顺序不应阻止人们允许完全可并行化的操作。我想到了一个例子,那就是图形人员倾向于执行的卷积操作。想象一下模糊图像。每个像素都是通过与其附近的像素相关来修改的,但是这些像素只是被读取,而不是自己进行修改。
据我了解(当然非常不稳定!),流机制是VM管理的并行世界的主要入口点,而迭代器仍然是它们一直以来的样子(是吗?不是吗?)如果正确的话,那么使用迭代器来解决我正在徘徊的问题领域似乎并不好。
因此,至少目前,创建一个分块分离器的建议似乎是最有前途的,但是支持该示例的代码似乎很费力!我认为我宁愿使用ForkJoin机制,尽管现在它已经“老套”了:)
无论如何,仍然对任何人提供的更多见解感兴趣。
结束编辑
有什么想法吗?我是否试图使用这些流来执行它们不打算执行的操作,还是我错过了一些明显的东西?
祝好, 托比。

2
“我是不是在尝试使用这些流来做它们本来不打算做的事情?”简而言之,是的。 - Misha
2
流通常旨在支持整个流的完全可并行操作;它们实际上不支持依赖于元素索引的操作,也不支持逐个元素的修改。 - Louis Wasserman
2
从标题和描述来看,我有点困惑主要是:从流和元素创建流,还是跳过流的元素?这在其他语言中“相同”的事实使得很难确定对您来说什么是相关的。然而,我不想明确反对之前的评论,但认为“这样的东西应该很容易”可能是合理的。http://docs.oracle.com/javase/8/docs/api/java/util/Spliterator.html的文档包含基于“TaggedArray”的流,这可能至少暗示了可能的方法。 - Marco13
就像Louis Wasserman所说,流并不是为这样的操作而设计的。那么我的问题是:你是否真的在寻找一种过滤集合中每三个元素的方法?这可以通过使用迭代器而不是流来实现。 - Sauli Tähkäpää
4个回答

1
记住的一件事是,Stream 主要设计为利用并行处理。这意味着它们有许多与之关联的条件,旨在给虚拟机自由地处理元素,以任何方便的顺序进行处理。其中一个例子是坚持要求归约函数是可结合的。另一个是操纵的局部变量是最终的。这些类型的条件意味着流项目可以按任意顺序评估和收集。
这导致的一个自然结果是,Stream 的最佳应用案例不涉及流值之间的依赖关系。诸如将整数流映射到它们的累积值之类的事情,在像LISP这样的语言中是微不足道的,但对于Java流来说却是相当不自然的适合方式(参见this question)。

有一些巧妙的方法可以通过使用sequential来强制Stream不并行以绕过这些限制,但我的经验是这些方法比它们所获得的价值更为麻烦。如果您的问题涉及需要状态处理值的基本顺序系列,则建议使用传统集合和迭代。代码将变得更清晰,并且在不能并行化流的情况下,性能也不会变差。

话虽如此,如果您真的想这样做,最直接的方法是使用一个收集器来存储每三个项目,然后将它们作为流再次发送:

class EveryThird {

    private final List<Integer> list = new ArrayList<>();
    private int count = 0;

    public void accept(Integer i) {
        if (count++ % 3 == 0)
            list.add(i);
    }

    public EveryThird combine(EveryThird other) {
        list.addAll(other.list);
        count += other.count;
        return this;
    }

    public Stream<Integer> stream() {
        return list.stream();
    }
}

这可以像这样使用:

IntStream.range(0, 10000)
    .collect(EveryThird::new, EveryThird::accept, EveryThird::combine)
    .stream()

但这并不是收集器的设计初衷,这样做非常低效,因为它在不必要地收集流。如上所述,我建议在这种情况下使用传统迭代。


1

我的StreamEx库增强了标准的Stream API。特别是它添加了headTail方法,允许递归定义自定义操作。它接收一个函数,该函数接收流头(第一个元素)和尾部(其余元素的流),并应返回结果流,该流将代替原始流。例如,您可以按以下方式定义every3操作:

public static <T> StreamEx<T> every3(StreamEx<T> input) {
    return input.headTail(
        (first, tail1) -> tail1.<T>headTail(
            (second, tail2) -> tail2.headTail(
                (third, tail3) -> every3(tail3))).prepend(first));
}

这里prepend也被使用了,它只是将给定的元素添加到流的开头(这个操作就像headTail的好朋友一样)。

通常情况下,使用headTail,你可以定义几乎任何你想要的中间操作,包括现有的和新的。你可以在这里找到一些示例here

请注意,我实现了一些机制来优化这种递归操作定义中的尾部,因此正确定义的操作不会在处理长流时耗尽整个堆栈。


0
Java的流与FP(惰性)序列完全不同。如果你熟悉Clojure,这个区别就像是“惰性序列”和“reducer”之间的区别一样。而“惰性序列”将每个元素的处理都封装起来,因此允许逐个获取经过处理的元素,而“reducer”则将整个序列折叠成一个原子操作。
特别针对你所描述的例子,考虑依赖于流的“分区”转换,详细描述可以参考这里。然后你就可以轻松地进行操作。
partition(originalStream, 3).map(xs -> xs.get(0));

导致流具有原始数据每三个元素中的一个。

这将保持效率、惰性和可并行性。


0
你可以使用(或查看源代码)BatchingSpliterator。然后,给定aStream,你可以创建一个由大小为3的列表组成的流(除了最后一个可能不满3个元素),并使用该列表的第一个元素。
Stream<T> aStream = ...;
Stream<List<T>> batchedStream =  
    new BatchingSpliterator.Builder().wrap(aStream).batchSize(3).stream();

batchedStream.map(l -> l.get(0) ). ... 

你也可以“并行运算”:

batchedStream.parallel().map(l -> l.get(0) ). ....    

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接