在并行Java流中处理随机数

5

我希望生成5个不同的随机数,范围在0-50之间,并在并行执行时对它们进行某些操作。当我写下这段程序时,程序永远没有结束:

new Random().ints(0, 50)
            .distinct()
            .limit(5)
            .parallel()
            .forEach(d -> System.out.println("s: " + d));

我尝试使用 peek 调试它。 我得到了无限数量的 c: 行,50 行 d:,但是 l:s: 行都是零:

new Random().ints(0, 50)
            .peek(d -> System.out.println("c: " + d))
            .distinct()
            .peek(d -> System.out.println("d: " + d))
            .limit(5)
            .peek(d -> System.out.println("l: " + d))
            .parallel()
            .forEach(d -> System.out.println("s: " + d));

我的实现有什么问题吗?


1
无限流(如IntStream.iterate(...))和随机数流之间的一个显着区别是,随机数流并不真正是无限的,而是具有Long.MAX_VALUE大小,并且甚至报告了这一点,这可能会产生有趣的影响... - Holger
这不是此问题的重复,请阅读我的答案。 - Tagir Valeev
3个回答

6
首先,请注意.parallel()会改变整个管道的并行状态,因此它会影响所有操作,而不仅仅是后续的操作。在你的情况下,
new Random().ints(0, 50)
            .distinct()
            .limit(5)
            .parallel()
            .forEach(d -> System.out.println("s: " + d));

与...相同

new Random().ints(0, 50)
            .parallel()
            .distinct()
            .limit(5)
            .forEach(d -> System.out.println("s: " + d));

您不能只并行化管道的一部分。它要么是并行的,要么不是。
现在回到您的问题。由于Random.ints是一个无序流,所以选择了无序的distinctlimit实现,因此它不是这个问题的重复(该问题中的问题在于有序的distinct实现)。这里的问题在于无序的limit()实现。为了减少可能的争用,它不会在每个子任务获得至少128个元素或上游耗尽之前检查在不同线程中找到的元素总数(请参见实现1 << 7 = 128)。在您的情况下,上游的distinct()仅找到50个不同的元素,并绝望地遍历输入以寻找更多元素,但下游的limit()没有发出停止处理的信号,因为它希望在检查是否达到限制之前收集至少128个元素(这不是很明智,因为限制小于128)。因此,要使此功能正常工作,您应选择至少(128 * CPU数量)个不同的元素。在我的4核机器上,使用new Random().ints(0, 512)成功,而new Random().ints(0, 511)则停滞不前。
为了解决这个问题,我建议按顺序收集随机数并在那里创建一个新的流:
int[] ints = new Random().ints(0, 50).distinct().limit(5).toArray();
Arrays.stream(ints).parallel()
      .forEach(d -> System.out.println("s: " + d));

我猜想您希望执行一些昂贵的下游处理操作。在这种情况下,并行生成5个随机数是没有什么用的。这部分操作顺序执行会更快。
更新:已提交bug报告并提交了补丁

2

您调用了ints(0, 50)

返回一个有效无限的伪随机int值流,每个值都符合给定的起点(包括)和终点(不包括)。

我最初认为未终止的IntStream是问题所在,但我复制了该问题。

new Random().ints(0, 50)
            .distinct().limit(5)
            .parallel().forEach(a -> System.out.println(a));

进入无限循环,同时
new Random().ints(0, 50)
            .distinct().limit(5)
            .forEach(a -> System.out.println(a));

正确结束。

我的流知识不是很好,无法解释清楚,但显然并行化不太好使用(可能由于无限流)。


但是当我删除.parallel()时,程序将正确打印5个不同的数字并退出。为什么在限制后添加.parallel()会使执行无限? - janinko
1
@janinko 当你添加parallel()时,整个流都被并行化了,这意味着distinct()必须使用多个线程进行计算。你可能想要收集结果,然后只对其进行并行处理。 - Peter Lawrey
@PeterLawrey,那么我在哪个位置使用.parallel()并不重要吗?我的想法是只有在.parallel()之后指定的事情才会并行执行。 - janinko
@janinko 我认为这是有道理的,然而它并不是这样做的。 - Peter Lawrey

1

您想要做的最接近的选项可能是使用iterateunordered

Random ran = new Random();
IntStream.iterate(ran.nextInt(50), i -> ran.nextInt(50))
    .unordered()
    .distinct()
    .limit(5)
    .parallel()
    .forEach(System.out::println);

使用无限流结合 distinctparallel 可能会很昂贵或导致没有响应。有关更多信息,请参见 API注释此问题


这似乎是有效的,但当我将 IntStream.iterate 替换为 ran.ints(0,50) 时,它会循环。为什么来自 IntStream.iterate 方法的 IntStream 表现不同于来自 Random.ints 方法的 IntStream - janinko
@janinko 你说得完全正确。这种行为差异看起来非常奇怪。我不知道确切的答案,但我怀疑这可能是因为元素在并行化时被分割的方式不同所致。 - dejvuth

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接