Java 8中,在forEach中对字段的操作会干扰Streams

5

考虑以下使用Java 8流的不太正经的程序:

private int biggestInt;

private void run() {
    ExecutorService executor = Executors.newWorkStealingPool();

    List<Callable<Integer>> callables = new ArrayList<>();

    for (int i = 0; i<50; i++) {
        callables.add(randomInt());
    }

    try {
        executor.invokeAll(callables)
            .stream()
            .map(future -> {
                    try {
                        return future.get();
                    } catch (Exception e) {
                        throw new IllegalStateException(e);
                    }
                })
            .forEach(this::compareBiggestInt);
    } catch (InterruptedException e) { /* do nothing */ }
}

private Callable<Integer> randomInt() {
    return () -> {
        Random random = new Random(System.currentTimeMillis());
        return random.nextInt();
    };
}

private void compareBiggestInt(Integer in) {
    if (in > biggestInt)
        biggestInt = in;
}

我的问题是,forEach(this::compareBiggestInt)是否并行执行,从而会在biggestInt中引入竞争条件?
如果是这样,我该如何避免这种竞争条件?例如,我是否可以像下面这样更改方法?
private synchronized void compareBiggestInt(Integer in) {[...]}

任何帮助都是受到欢迎的!
5个回答

2
forEach 方法不会在并行流中执行。实际上,执行异步任务的是 executorStream#map 操作将等待所有的 Future 完成。 如果 您想要在并行流中执行操作,应该使用规约操作Stream#reduce。例如:
biggestInt = executor.invokeAll(callables)
        .parallelStream()
        .map(...)// same with yours
        .reduce(BinaryOperator.maxBy(Comparator.naturalOrder()))
        .orElse(null);

3
你好,holi-java,你还好吗?关于措辞的细节问题...... Stream.reduce 不是可变规约,因为中间结果没有在可变结构中累积。Stream 的可变规约操作是 collect。 - fps
@FedericoPeraltaSchaffner 你好,我刚刚复制了文档链接。我认为这是正确的。你可以在Collector类的描述中看到它。 - holi-java
2
但是您已经链接到了包文档的Reduction部分,这与其后面的Mutable reduction部分不同。reduceReductioncollectMutable reduction - Holger
3
我不知道“该文件”应该是什么。您已经链接到java.util.stream包的文档,其中描述了几个不同的概念。Reduction是其中之一,描述了该概念及其相关方法reduceMutable reduction是另一个概念,在随后的部分中使用相关方法collect进行描述。reduce方法的文档清楚地链接到Reduction,但是,如果您向下滚动到下一个部分,您将进入Mutable reduction,但是如果您读取这两个部分,您应该注意到它是新的部分。 - Holger
3
有另外一点需要注意,只有 BinaryOperator 中的 maxBy 需要一个 Comparator,所以操作必须是 .reduce(BinaryOperator.maxBy(Comparator.naturalOrder())),但你可以通过 Stream API 提供的.max(Comparator.naturalOrder()) 简化此操作,它在内部实现上执行的也是相同的操作。 - Holger
显示剩余3条评论

2

这里有几个问题。首先:

return () -> {
    Random random = new Random(System.currentTimeMillis());
    return random.nextInt();
};

执行速度非常快(我可以轻松重现),因此这将始终返回相同的值。我建议您至少删除那个 millis
private static Callable<Integer> randomInt() {
    return () -> {
        Random random = new Random();
        int x = random.nextInt(100);
        System.out.println(x);
        return x;
    };
}

甚至更好的方法是使用ThreadLocalRandom.current().nextInt(100)

我还将nextInt更改为返回范围内的值,最高到[0.. 100],因为nextInt可能返回负值,想象一下你返回了50个负值,然后您的最大值将是zero(默认值)或biggestInt;这显然是错误的。

然后你的流是sequential,在每个map操作中你都会阻塞,直到那个Future.get完成。所以你的forEach只能由一个线程执行。


@Sven793 你可以使用System.nanoTime()来获取纳秒级别的精度,这将提高获得更好随机数的概率。 - Eugene
@Sven793 先说一下正确的问题,既然你知道 forEach 只会被单个线程运行,如果你在流中添加 parallel,那结果会是什么呢? - Eugene
1
@Sven793,你的意思是你添加了“parallel”并且看到流以并行方式执行?这是非常预期的。让我重新表述一下我的话:对于相同的数据,添加“parallel”会显示与顺序流不同的最大值吗? - Eugene
1
@Eugene,请注意,System.nanoTime()并不一定产生实际纳秒精度的时间戳。我记得有些Windows机器没有纳秒精度的时钟,因此在那里,nanoTime()返回被截断到系统时钟支持的毫秒精度的时间戳。 - M. Prokhorov
2
咳咳,不要忘记这些“Callable”应该在并行运行,因为它们被传递给“invokeAll”。因此,即使在速度缓慢的机器和高精度计时器下,当使用系统时间作为种子时,其中几个可能会产生相同的值。那么,为什么不使用“ThreadLocalRandom.current().nextInt(100)”呢?自Java 7以来,这将比为每个数字创建本地“Random”实例更快... - Holger
显示剩余4条评论

2
不,forEach 不是并行执行的。如果是这样,就会违反使用 stream()parallelStream() 时期望的 forEach 的一般约定,这与您引入 ExecutorService 的事实无关。 invokeAll() 实际上返回一个 List,其中包含已经完成或超时的 Future 实例。因此,在您与流进行交互之前,并行部分已经完成。

0

你没有使用parallel流,所以你的流是顺序的。如果你想确保你的流是顺序完成的,请在你的流中添加.sequential()方法。

来自文档

default Stream<E> stream()  
Returns a sequential Stream with this collection as its source.

0

assuming that you are running the streams in parallel (I changed the code to use "parallelStream"), you have to protect all changes to the shared mutable variables.

for example, in code below I'm using "synchronized" in method "compareBiggestInt" to protect all accesses to variable "biggestInt". (if you remove "synchronized" and run below code you can see that there is indeed a race condition in method "compareBiggestInt")

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class ParallelStreamExample  {

    private volatile int biggestInt;

    public static void main(String[] args) {
        ParallelStreamExample parallelStreamExample = new ParallelStreamExample();
        parallelStreamExample.doTheWork();
    }



    private void doTheWork() {
        ExecutorService executor = Executors.newWorkStealingPool();

        List<Callable<Integer>> callables = new ArrayList<>();

        for (int i = 0; i < 5; i++) {
            callables.add(randomInt());
        }

        try {
            executor.invokeAll(callables)
                    .parallelStream()
                    .map(future -> {
                        try {
                            return future.get();
                        } catch (Exception e) {
                            throw new IllegalStateException(e);
                        }
                    })
                    .forEach(this::compareBiggestInt);
        } catch (InterruptedException e) { /* do nothing */ }
    }

    private Callable<Integer> randomInt() {
        return () -> {
            Random random = new Random();
            return random.nextInt(10);
        };
    }

    private synchronized void compareBiggestInt(Integer in)  {
        System.out.println("in:" + in + " - current biggestint = " + biggestInt);
        if (in > biggestInt) {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            biggestInt = in;
        }
        System.out.println("in:" + in + " - current biggestint = " + biggestInt);
    }
}


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接