让我们从一个简单的函数开始,它总是返回一个随机整数:
import numpy as np
def f(x):
return np.random.randint(1000)
并且一个填充了零的 RDD 通过 f
进行映射:
rdd = sc.parallelize([0] * 10).map(f)
由于上述RDD未被持久化,因此我预计每次收集时都会得到不同的输出:
> rdd.collect()
[255, 512, 512, 512, 255, 512, 255, 512, 512, 255]
如果我们忽略数值分布实际上并不随机的事实,那么大体上就是发生了这种情况。问题在于,当我们只考虑第一个元素时,它就开始出现了。
如果我们忽略数值分布实际上并不随机的事实,那么大体上就是发生了这种情况。问题在于,当我们只考虑第一个元素时,它就开始出现了。
assert len(set(rdd.first() for _ in xrange(100))) == 1
或者assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1
每次似乎都返回相同的数字。我已经使用Spark 1.2、1.3和1.4在两台不同的机器上重现了这种行为。这里我使用了np.random.randint
,但它与random.randint
的行为方式相同。
这个问题,就像使用collect
时出现的非完全随机结果一样,似乎是Python特有的,我无法通过Scala再现这个问题:
def f(x: Int) = scala.util.Random.nextInt(1000)
val rdd = sc.parallelize(List.fill(10)(0)).map(f)
(1 to 100).map(x => rdd.first).toSet.size
rdd.collect()
我在这里错过了一些显而易见的东西吗?
编辑:
结果发现问题的根源是Python RNG实现。引用官方文档:
此模块提供的函数实际上是隐式 random.Random 类实例的绑定方法。您可以实例化自己的 Random 实例,以获得不共享状态的生成器。
我假设NumPy也是这样工作的,然后使用RandomState
实例重写f
,如下所示:
import os
import binascii
def f(x, seed=None):
seed = (
seed if seed is not None
else int(binascii.hexlify(os.urandom(4)), 16))
rs = np.random.RandomState(seed)
return rs.randint(1000)
这使得程序变慢,但解决了问题。
虽然上面的内容解释了为什么会有从collect中获得不随机结果的情况,但我仍然不明白它如何影响多个动作中的first
/ take(1)
。