在PySpark中生成随机数

15

让我们从一个简单的函数开始,它总是返回一个随机整数:

import numpy as np

def f(x):
    return np.random.randint(1000)

并且一个填充了零的 RDD 通过 f 进行映射:

rdd = sc.parallelize([0] * 10).map(f)

由于上述RDD未被持久化,因此我预计每次收集时都会得到不同的输出:

> rdd.collect()
[255, 512, 512, 512, 255, 512, 255, 512, 512, 255]
如果我们忽略数值分布实际上并不随机的事实,那么大体上就是发生了这种情况。问题在于,当我们只考虑第一个元素时,它就开始出现了。

如果我们忽略数值分布实际上并不随机的事实,那么大体上就是发生了这种情况。问题在于,当我们只考虑第一个元素时,它就开始出现了。

assert len(set(rdd.first() for _ in xrange(100))) == 1
或者
assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1

每次似乎都返回相同的数字。我已经使用Spark 1.2、1.3和1.4在两台不同的机器上重现了这种行为。这里我使用了np.random.randint,但它与random.randint的行为方式相同。

这个问题,就像使用collect时出现的非完全随机结果一样,似乎是Python特有的,我无法通过Scala再现这个问题:

def f(x: Int) = scala.util.Random.nextInt(1000)

val rdd = sc.parallelize(List.fill(10)(0)).map(f)
(1 to 100).map(x => rdd.first).toSet.size

rdd.collect()

我在这里错过了一些显而易见的东西吗?

编辑:

结果发现问题的根源是Python RNG实现。引用官方文档:

此模块提供的函数实际上是隐式 random.Random 类实例的绑定方法。您可以实例化自己的 Random 实例,以获得不共享状态的生成器。

我假设NumPy也是这样工作的,然后使用RandomState实例重写f,如下所示:

import os
import binascii

def f(x, seed=None):
    seed = (
        seed if seed is not None 
        else int(binascii.hexlify(os.urandom(4)), 16))
    rs = np.random.RandomState(seed)
    return rs.randint(1000)

这使得程序变慢,但解决了问题。

虽然上面的内容解释了为什么会有从collect中获得不随机结果的情况,但我仍然不明白它如何影响多个动作中的first / take(1)


只是想澄清一下:如果我在Spark中使用numpy的随机函数,它是否总是在每个分区中选择相同的结果? 我该如何使用np.random.choice使其变得随机? - member555
它总是在每个分区中选择相同的结果 - 不完全准确,但在单个工作人员计算的值不会独立。如何使用 np.random.choice 使其随机?- 我已在编辑中描述了解决方案。您应该使用单独的状态。由于这相当昂贵,因此您可能希望每个分区初始化一次。 - zero323
你能详细解释一下问题是什么吗?为什么Python的共享状态是一个问题? - member555
@member555 嗯,这是一个很广泛的问题。简而言之,像RNG这样的东西实际上是伪随机数生成器,并且会生成一系列确定性值。在值被更新之前,不同线程多次访问相同的值。一个简单的SO搜索应该可以为您提供更多细节。 - zero323
1
这解决了我的问题,但是“编辑”部分不应该成为答案的一部分吗? - Akavall
@Akavall 可能应该是这样,但这两者之间有9个月的时间差。我很快就解决了其中一部分问题,并希望其他人能填补空白。我会在有空的时候尝试重新组织这个问题。我很高兴它能帮到你。 - zero323
3个回答

4
所以这里的实际问题相对简单。Python中的每个子进程都从其父进程继承其状态:
len(set(sc.parallelize(range(4), 4).map(lambda _: random.getstate()).collect()))
# 1

在这种特定的情况下,父级状态没有改变的理由,且工作线程的生命周期很短,因此每个子级的状态在每次运行时都完全相同。


3

这似乎是randint的一个错误(或特性)。我看到相同的行为,但只要我更改f,值确实会改变。因此,我不确定此方法的实际随机性...我找不到任何文档,但它似乎使用某些确定性数学算法而不是使用运行机器的更多可变功能。即使我来回移动,返回原始值时数字似乎仍然是相同的...


1
这是一个实现Mersenne Twister的伪随机生成器,但这不应该是问题。问题肯定与共享的“Random”类有关(我已经编辑了问题以反映这一点),但它如何影响“first”输出仍然让我感到困惑。 - zero323

3
针对我的使用情况,大部分解决方案都被埋在问题底部的编辑中。但是,还有另一个复杂性:我想使用相同的函数来生成多个(不同)的随机列。事实证明,Spark假定UDF的输出是确定性的,这意味着它可以跳过使用相同输入的相同函数的后续调用。对于返回随机输出的函数,这显然不是你想要的。
为了解决这个问题,我使用内置的PySpark rand函数为每个我想要的随机列生成一个单独的种子列:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
import numpy as np

@F.udf(IntegerType())
def my_rand(seed):
    rs = np.random.RandomState(seed)
    return rs.randint(1000)

seed_expr = (F.rand()*F.lit(4294967295).astype('double')).astype('bigint')
my_df = (
    my_df
    .withColumn('seed_0', seed_expr)
    .withColumn('seed_1', seed_expr)
    .withColumn('myrand_0', my_rand(F.col('seed_0')))
    .withColumn('myrand_1', my_rand(F.col('seed_1')))
    .drop('seed_0', 'seed_1')
)

我使用DataFrame API而不是原始问题的RDD,因为我更熟悉它,但是相同的概念应该适用。请注意:自Spark v2.3以来,似乎可以禁用Scala Spark UDF的确定性假设:https://jira.apache.org/jira/browse/SPARK-20586

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接