减少返回并行流的结果是不可预测的

3
我使用Java Stream Reduce编写了以下代码示例:
Person reducedPerson = Person.getPersons().stream()
                .parallel()  //will return surprising result
                .reduce(new Person(), (intermediateResult, p2) -> {
                            intermediateResult.setAge(intermediateResult.getAge() + p2.getAge());
                            return intermediateResult;
                        },
                        (ir1, ir2) -> {
                            ir1.setAge(ir1.getAge() + ir2.getAge());
                            return ir1;
                        });
        System.out.println(reducedPerson);

模型:

public class Person {

    String name;

    Integer age;

    public Person() {
        age = 0;
        name = "default";
    }

    //...
    public Person(String name, Integer age) {
        this.name = name;
        this.age = age;
    }

    public static Collection<Person> getPersons() {
        List<Person> persons = new ArrayList<>();
        persons.add(new Person("Vasya", 12));
        persons.add(new Person("Petya", 32));
        persons.add(new Person("Serj", 10));
        persons.add(new Person("Onotole", 18));
        return persons;
    }
}

每个代码示例执行的结果都不同:

例如: Person{name='默认', age=256} 或者

Person{name='default', age=248}

我已经确定问题在combiner中,因为在顺序流代码中执行正确。

请帮忙纠正组合器。

P.S.

期望的结果是名称为“default”的人和年龄为72岁(列表中所有人的总年龄)。

P.S.

对于整数的相同代码作为缩小结果可以正常工作:

Integer age = Person.getPersons().stream()
                .parallel()
                .reduce(0, (intermediateResult, p2) -> {
                    intermediateResult = intermediateResult + p2.getAge();
                    return intermediateResult;
                }, (ir1, ir2) -> {
                    System.out.println("combiner");
                    ir1 = ir1 + ir2;
                    return ir1;
                });
        System.out.println(age);

7
这是因为您的 reduce 调用违反了合同的所有可能部分。请阅读文档。您不应该在流中改变对象。 - Boris the Spider
3
是的,应该这样做。您不需要合并,只需改变其中一个对象然后返回它即可。 - Boris the Spider
1
不清楚为什么您使用了reduce的三个参数版本而不是两个参数版本,因为累加器与输入具有相同的类型。 - Louis Wasserman
3
@gstackoverflow 是个独立的问题,但你仍然应该这么做。话虽如此,为什么不写更合理、更高效、更直接的 Person.getPersons().stream().parallel().mapToInt(Person::getAge).sum() 呢?为什么需要生成一个 Person 而不是一个求和后的年龄,后者会更合乎逻辑呢? - Louis Wasserman
1
你是在同一个程序中多次调用它吗?还是只是重新运行整个程序并获得不同的结果? - Louis Wasserman
显示剩余14条评论
2个回答

7

要进行可变减少操作,请使用collect

reducedPerson = Person.getPersons().parallelStream()
        .collect(
                Person::new,
                (p, q) -> p.setAge(p.getAge() + q.getAge()),
                (p, q) -> p.setAge(p.getAge() + q.getAge())
        );

collect 是专门设计用于安全地在并行环境下累积到可变容器中的。


2
正如Boris所指出的,问题在于流内部的变异。
大多数流操作接受描述用户指定行为的参数,例如上面示例中传递给mapToInt的lambda表达式w -> w.getWeight()。为了保持正确的行为,这些行为参数必须:
- 必须是非干扰性的(它们不会修改流源); - 在大多数情况下必须是无状态的(它们的结果不应依赖于在执行流管道期间可能发生更改的任何状态)。
这里有一个使用reduce的版本,以及使用mapToInt和sum的更直观版本。
链接:https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html
class gstackoverflow{
  public static void main(String... args) {
    Person reducedPerson = Person.getPersons().stream()
        .parallel()  //will NOT return surprising result
        .reduce(new Person("default",0),
            (ir1, ir2) -> //no longer mutates
                new Person(String.join(",", ir1.getName(), ir2.getName()), ir1.getAge() + ir2.getAge())
        );
    System.out.println(reducedPerson);

    //here is a clean(er) way to do it:
    int totalAge = Person.getPersons().stream()
        .parallel()  //will NOT return surprising result
        .mapToInt(Person::getAge)
        .sum();
    System.out.println(totalAge);
  }
}

class Person {//no longer mutable

  public String getName() {
    return name;
  }

  public Integer getAge() {
    return age;
  }

  final String name;

  final Integer age;

  //no args constructor removed
  public Person(String name, Integer age) {
    this.name = name;
    this.age = age;
  }

  public static Collection<Person> getPersons() {
    List<Person> persons = new ArrayList<>();
    persons.add(new Person("Vasya", 12));
    persons.add(new Person("Petya", 32));
    persons.add(new Person("Serj", 10));
    persons.add(new Person("Onotole", 18));
    return persons;
  }
  @Override
  public String toString() {
    final StringBuilder sb = new StringBuilder("Person{");
    sb.append("name='").append(name).append('\'');
    sb.append(", age=").append(age);
    sb.append('}');
    return sb.toString();
  }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接