用Java 8 Streams替代传统的newForLoop

9

因此,最终我从Java 6跳到了Java 8,我已经阅读了相当多的Java 8 Streams API。不幸的是,被提问的几乎所有示例都与我试图弄清楚如何做的事情几乎接近,但又不够接近。

我手头的是:

final List<Function<? super Double, Double>> myList = generateList();
final double myVal = calculate(10);

private double calculate(double val) {
    for (Function<? super Double, Double> function : this.myList) {
        val += function.apply(val);
    }
    return val;
}

现在,我已经明白可以使用.stream().forEach()做类似的事情,但是这只适用于foreach和streams需要final变量。我尝试使用DoubleStream进行探索以获取sum(),但我需要重新应用当前总和到每个Function并将该总和添加到下一个函数中,就像上面的代码示例一样。 纯Stream API是否可以实现这一点?
编辑:因此,在对reduce()区域进行测试后,我对执行此类型计算所需的时间进行了简单的测试,结果不利于流。以下是一个示例https://gist.github.com/gabizou/33f616c08bde5ab97e56。其中包括从相当基本的测试中输出的日志。

不,实际上并不是这样的。"Stream(流)"操作并不适用于像您正在进行的本质上是连续的操作。 - Louis Wasserman
我理解这一点,只是希望流最终在性能方面也能达到同样的水平。从整体来看,对于大量函数集合,这可能最终会实现相当的性能平衡,但对于我所做的事情,这不太可能。当然,随着我们使用新的Java版本不断发展,我们可能会看到优化和改进。 - gabizou
3个回答

11
您可以使用流API来将函数列表组合成一个函数。
static List<Function<? super Double, Double>> myList
    = Arrays.asList(d -> d + 4, d -> d * 2, d -> d - 3);

static Function<Double, Double> total=myList.stream()
    .map(f -> (Function<Double, Double>) d -> d + f.apply(d))
    .reduce(Function::andThen).orElse(Function.identity());

static double calculate(double val) {
    return total.apply(val);
}

public static void main(String[] args) {
    System.out.println(calculate(10));
}

产生复合函数的流操作不存在结合性问题,理论上甚至可以并行执行(尽管在这里没有任何好处),而结果是一个单一函数,其本身是顺序的,永远不会被分解成需要是可结合的部分。


只是一点说明:并行化这个程序不仅没有“好处”,而且是可怕的。在顺序模式下,即使有1000个函数,它也运行得很好。使用并行流...即使有10个函数,它似乎会死锁或者发生其他错误。 - RealSkeptic
1
@RealSkeptic:那个死锁似乎是由于在类的“静态”初始化器中运行引起的。当我将代码移动到一个方法中时,它可以正常运行。我会调查这个问题,因为它似乎是一个严重的错误。 - Holger
2
@RealSkeptic:问题在于Lambda表达式会产生新的类,这些类会在其定义的类上调用合成方法。在未完全初始化的类上执行异步方法调用将等待初始化完成,因此如果初始化器本身等待该异步操作的完成,则会出现死锁。因此,在“静态”初始化程序中进行并行流操作是不可行的。 - Holger
谢谢解释。这个应该在某个地方用粗体字写出来。也许你应该创建一个自问自答的问题来解决它。 - RealSkeptic
这段代码具体是做什么的?我刚刚运行了它,使用代码中的值返回了141,但我预期应该是41([10 + 4] + [10 * 2] + [10 - 3])。 - Leonardo Pina
2
@Leonardo Pina:组合不是像 f(x)+g(x)+h(x),而是像 h(g(f(x))),即一个函数的输出是下一个函数的输入。在这种特殊情况下,每个函数 f 都被投影到 x+f(x),因此,所得到的函数就像 h(g(f(x)+x)+f(x)+x)+g(f(x)+x)+f(x)+x。这也是问题中的循环所做的事情;它将函数的结果添加到 val,但 val 是下一个函数的参数。 - Holger

4

是的,您可以使用流处理方案,通过执行“缩减”操作:

private double calculate(double val) {
    return myList.stream().reduce(val, (d, f) -> d + f.apply(d), (a, b) -> a + b);
}

一种减少操作可将每个元素聚合(减少)为一个值。 reduce() 方法有三种不同的用法 - 此处使用的是本技术。


一些测试代码:

static Function<? super Double, Double> a = (d) -> d + 4;
static Function<? super Double, Double> b = (d) -> d * 2;
static Function<? super Double, Double> c = (d) -> d - 3;
static List<Function<? super Double, Double>> myList = Arrays.asList(a, b, c);

static double calculate(double val) {
    return myList.stream().reduce(val, (d, f) -> d + f.apply(d), (a, b) -> a + b);
}

public static void main(String[] args) {
    System.out.println(calculate(10));
}

输出:

141.0

2
我认为它并不做同样的事情。原始代码针对三个函数f、g、h,会给出f(val) + g(f(val)) + h(f(val) + g(f(val)))的结果,而那个reducer允许使用“+”来组合部分结果,因此你可能会得到f(val) + g(val) + h(val)或类似的结果。这里的组合器不符合文档中的约束条件。 - RealSkeptic
我刚刚进行了逻辑测试,代码确实是这样做的。10 + (10 + 4) = 24 然后 24 + (24 *2) = 72 然后 72 + (72 - 3) = 141最终结果仍然相同。现在我真的很好奇在这种情况下使用流与传统方法相比的性能成本。 - gabizou
2
实际上,这是为(希望)即将推出的foldLeft而工作。在这里,操作是非关联的,因此reduce在并行情况下会产生错误的结果。 - Tagir Valeev
1
这就是问题所在,不是吗?合并器从未被调用的事实是未记录的,并且是一个实现细节,不应该依赖它。这是非常有意的,任何使用不会为相同流的并行和非并行版本产生相同结果的情况都应该避免。 - RealSkeptic
3
如果代码依赖于组合器从未被调用,最起码应该提供(x,y)->{throw new AssertionError();}作为组合器,而不是产生错误值的静默输出。@gabizou:你的性能测试缺乏预热阶段,因此你衡量的是流框架的类加载和初始化时间。 - Holger
显示剩余3条评论

0

这个例子对于Java 8流来说非常棘手。它们被设计用于顺序不重要的操作。

函数应用不是结合律的。为了解释,让我们举一个更简单的例子,假设我们想要取一个数字并将其除以一系列数字:

static List<Double> dividers = Arrays.asList( 3.5, 7.0, 0.5, 19.0 );

public double divideByList( double a ) {
    for ( Double d : dividers ) {
        a /= d;
    }
    return a;
}

所以,你得到的是

a ÷ 3.5 ÷ 7.0 ÷ 0.5 ÷ 19.0

这个算术很简单 - 除法是左结合的,意味着这等同于

a ÷ ( 3.5 × 7.0 × 0.5 × 19.0)

不是

a ÷ ( 3.5 ÷ 7.0 ÷ 0.5 ÷ 19.0 )

不是

( a ÷ 3.5 ÷ 7.0 ) ÷ ( 0.5 ÷ 19.0 )

基于reduce/collectors的流操作要求“归约”操作是左结合的。这是因为它们想允许该操作并行化,使得一些线程执行一些操作,然后将结果组合起来。现在,如果您的运算符是乘法而不是除法,那就不会有问题,因为

a × 3.5 × 7.0 × 0.5 × 19.0

等同于

(a × 3.5 × 7.0 ) × (0.5 × 19)

这意味着一个线程可以执行 a × 3.5 × 7.0,另一个线程可以执行 0.5 × 19.0 操作,然后你可以将结果相乘,得到与顺序计算相同的结果。但对于除法来说,这种方法行不通。

函数应用也是非关联的,就像除法一样。也就是说,如果你有函数 fgh,并且你运行顺序计算,你最终会得到:

result = val + f(val) + g(val + f(val)) + h(val + f(val) + g(val + f(val)))

现在,如果您有两个中间线程,一个应用fg,另一个应用h,并且您想要组合结果-没有办法将正确的值放入h


你可能会尝试使用像Stream.reduce这样的方法,就像@Bohemian建议的那样。但是文档警告你不要这样做:

<U> U reduce(U identity,
             BiFunction<U,? super T,U> accumulator,
             BinaryOperator<U> combiner)

...

The identity value must be an identity for the combiner function. This means that for all u, combiner(identity, u) is equal to u. Additionally, the combiner function must be compatible with the accumulator function; for all u and t, the following must hold:

combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)

对于像+这样的操作,其身份是0。对于*,其身份是1。因此,将您的val用作identity是违反文档规定的。而第二个条件更加棘手。

尽管非并行流的当前实现不使用combiner部分,这使得两个条件都是不必要的,但这是未记录在文档中的,未来的实现或不同JRE的实现可能决定创建中间结果并使用combiner将它们连接起来,也许是为了提高性能或出于其他考虑。

因此,尽管很诱人,但不应该使用Stream.reduce来尝试模仿原始的顺序处理。


有一种方法可以做到这一点,而不会真正破坏文档。它涉及保持一个可变对象来保存结果(必须是对象,以便在仍然可变的同时有效地最终),并使用{{link1:Stream.forEachOrdered}},如果流是有序的,则保证操作将按照它们在流中出现的顺序执行。列表的流具有定义的顺序。即使您使用myList.stream().parallel(),这也适用。

public static double streamedCalculate(double val) {
    class MutableDouble {
        double currVal;
        MutableDouble(double initVal) {
            currVal = initVal;
        }
    }

    final MutableDouble accumulator = new MutableDouble(val);

    myList.stream().forEachOrdered((x) -> accumulator.currVal += x.apply(accumulator.currVal));
    return accumulator.currVal;
}

就我个人而言,我发现你原来的循环比这个更易读,所以在这里使用流并没有任何优势。

根据@Tagir Valeev的评论,未来Java版本中计划添加foldLeft操作。那时候它可能看起来更加优雅。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接