有状态和无状态lambda表达式有什么区别?

9
根据OCP书籍,必须避免有状态操作,也就是有状态lambda表达式。书中提供的定义是,“有状态lambda表达式是指其结果取决于在管道执行过程中可能发生变化的任何状态。”
书中提供了一个例子,使用并行流和.map()函数将一组固定数字添加到同步ArrayList中。ArrayList中的顺序完全随机,这应该让我们看到,有状态lambda表达式会在运行时产生不可预测的结果。因此,在使用并行流时强烈建议避免有状态操作,以消除任何潜在的数据副作用。
他们没有展示一个无状态lambda表达式来解决同样的问题(向同步ArrayList添加数字),我仍然不明白使用map函数向空同步ArrayList填充数据的问题在哪里... 到底是哪个状态可能会在管道执行期间发生变化?他们是指ArrayList本身吗?比如当另一个线程在并行流仍在添加数字的过程中决定向ArrayList添加其他数据,从而改变最终结果?
也许有人可以为我提供一个更好的例子,展示什么是有状态lambda表达式,以及为什么应该避免使用它。非常感谢。
谢谢

1
你能添加一段代码片段,以便我们确切地知道你所谈论的用例是什么吗? - Oliver Charlesworth
5个回答

5
第一个问题是:
 List<Integer> list = new ArrayList<>();

    List<Integer> result = Stream.of(1, 2, 3, 4, 5, 6)
            .parallel()
            .map(x -> {
                list.add(x);
                return x;
            })
            .collect(Collectors.toList());

System.out.println(list);

由于您正在向非线程安全的集合ArrayList添加元素,所以无法预测结果。

即使您这样做:

  List<Integer> list = Collections.synchronizedList(new ArrayList<>());

当列表没有可预测的顺序时,执行相同的操作。多个线程会添加到此同步集合中。通过添加同步集合,您保证了所有元素都被添加(与普通的ArrayList不同),但它们将以何种顺序存在是未知的。
请注意,列表没有任何排序保证,这称为处理顺序。对于此特定示例,“result”保证为:[1, 2, 3, 4, 5, 6]
根据问题,通常可以摆脱有状态的操作;对于您的示例,返回同步列表将是:
 Stream.of(1, 2, 3, 4, 5, 6)
            .filter(x -> x > 2) // for example a filter is present
            .collect(Collectors.collectingAndThen(Collectors.toList(), 
                          Collections::synchronizedList));

1
顺便提一下,这里可以找到一个很好的例子(也许你可以添加对文档的引用?):https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html#stateful_lambda_expressions - JDC
如果您不关心顺序,为什么会成为问题呢?文档中说:“避免在流操作的参数中使用有状态的lambda表达式”https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html 但据我所知,唯一的后果就是无法预测输出的顺序。如果您正在并行执行API调用,并且不关心结果返回的顺序,那么问题在哪里呢? - mowwwalker

3

有状态的lambda表达式是指其结果取决于管道执行期间可能发生变化的任何状态。另一方面,无状态的lambda表达式是指其结果不依赖于管道执行期间可能发生变化的任何状态。

来源:OCP:Oracle认证专业Java SE 8程序员II学习指南:考试1Z0-809,作者Jeanne Boyarsky,Scott Selikoff

    List < Integer > data = Collections.synchronizedList(new ArrayList < > ());

            Arrays.asList(1, 2, 3, 4, 5, 6, 7).parallelStream()


                   .map(i -> {
                    data.add(i);
                    return i;
                }) // AVOID STATEFUL LAMBDA EXPRESSIONS!
                .forEachOrdered(i -> System.out.print(i+" "));


            System.out.println();
            for (int e: data) {
                System.out.print(e + " ");

可能的输出结果:

1 2 3 4 5 6 7 
1 7 5 2 3 4 6 

强烈建议在使用并行流时避免状态操作,以消除任何潜在的数据副作用。实际上,在串行流中尽可能避免使用它们,因为它们会阻止您的流利用并行化。


3

为了举例说明,我们考虑以下的Consumer函数(注意:此处不考虑该函数的实用性):

public static class StatefulConsumer implements IntConsumer {

    private static final Integer ARBITRARY_THRESHOLD = 10;
    private boolean flag = false;
    private final List<Integer> list = new ArrayList<>();

    @Override
    public void accept(int value) {
        if(flag){   // exit condition
            return; 
        }
        if(value >= ARBITRARY_THRESHOLD){
            flag = true;
        }
        list.add(value); 
    }

}

这是一个消费者,它会向列表(List)中添加项目(不考虑如何获取列表或线程安全性),并具有标志(表示状态)。

其逻辑是,一旦达到阈值,消费者应停止添加项目。

你的书想表达的意思是,由于函数消耗流元素的顺序没有保障,输出是不确定的。

因此,他们建议只使用无状态函数,这意味着它们将始终在相同输入下产生相同的结果。


3

以下是一个有状态操作每次返回不同结果的示例:

public static void main(String[] args) {

Set<Integer> seen = new HashSet<>();

IntStream stream = IntStream.of(1, 2, 3, 1, 2, 3);

// Stateful lambda expression
IntUnaryOperator mapUniqueLambda = (int i) -> {
    if (!seen.contains(i)) {
        seen.add(i);
        return i;
    }
    else {
        return 0;
    }
};

int sum = stream.parallel().map(mapUniqueLambda).peek(i ->   System.out.println("Stream member: " + i)).sum();

System.out.println("Sum: " + sum);
}

在我的情况下,当我运行代码时,我得到了以下输出:

Stream member: 1
Stream member: 0
Stream member: 2
Stream member: 3
Stream member: 1
Stream member: 2
Sum: 9

如果我正在向哈希集合中插入数据,为什么我的总和是9呢?
答案是:不同的线程获取了IntStream的不同部分。 例如,值1和2可能会被分配到不同的线程上。


1

有状态的lambda表达式是指其结果取决于在流水线执行过程中可能发生变化的任何状态。

让我们通过以下示例来理解:

    List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15);
    List<Integer> result = new ArrayList<Integer>();

    list.parallelStream().map(s -> {
            synchronized (result) {
              if (result.size() < 10) {
                result.add(s);
              }
            }
            return s;
        }).forEach( e -> {});
     System.out.println(result);  

当您运行此代码5次时,输出可能每次都不同。原因在于Lambda表达式在map中的处理会更新结果数组。由于结果数组取决于特定子流的数组大小,因此每次调用此并行流时都会发生变化。
为了更好地理解并行流: 并行计算涉及将问题分成子问题,同时解决这些问题(并行运行,每个子问题在单独的线程中运行),然后组合解决方案的结果。当流以并行方式执行时,Java运行时将流划分为多个子流。聚合操作并行迭代和处理这些子流,然后组合结果。
希望这有所帮助!

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接