Spark - 随机数生成

13

我编写了一个方法,必须考虑一个随机数来模拟伯努利分布。我使用 random.nextDouble 生成介于0和1之间的数字,然后根据该值和我的概率参数做出决策。

我的问题是在for循环映射函数的每次迭代中,Spark生成相同的随机数。我正在使用 DataFrame API。我的代码遵循以下格式:

val myClass = new MyClass()
val M = 3
val myAppSeed = 91234
val rand = new scala.util.Random(myAppSeed)

for (m <- 1 to M) {
  val newDF = sqlContext.createDataFrame(myDF
    .map{row => RowFactory
      .create(row.getString(0),
        myClass.myMethod(row.getString(2), rand.nextDouble())
    }, myDF.schema)
}

这是这个类:

class myClass extends Serializable {
  val q = qProb

  def myMethod(s: String, rand: Double) = {
    if (rand <= q) // do something
    else // do something else
  }
}

每次调用myMethod时,我需要一个新的随机数。我也尝试在我的方法内部使用java.util.Random生成数字(scala.util.Random v10不扩展Serializable),如下所示,但在每个for循环中仍然得到相同的数字。

val r = new java.util.Random(s.hashCode.toLong)
val rand = r.nextDouble()

我做了一些研究,似乎这与Spark的确定性有关。

4个回答

18

只需使用 SQL 函数 rand

import org.apache.spark.sql.functions._

//df: org.apache.spark.sql.DataFrame = [key: int]

df.select($"key", rand() as "rand").show
+---+-------------------+
|key|               rand|
+---+-------------------+
|  1| 0.8635073400704648|
|  2| 0.6870153659986652|
|  3|0.18998048357873532|
+---+-------------------+


df.select($"key", rand() as "rand").show
+---+------------------+
|key|              rand|
+---+------------------+
|  1|0.3422484248879837|
|  2|0.2301384925817671|
|  3|0.6959421970071372|
+---+------------------+

1
这并没有完全解决我的问题,但这是一个优雅的解决方案,我将来可能会使用它,所以+1。 - Brian

6
根据这篇文章,最佳解决方案不是在map中放置new scala.util.Random,也不是完全放在外部(即驱动程序代码),而是放在一个中间的mapPartitionsWithIndex中:
import scala.util.Random
val myAppSeed = 91234
val newRDD = myRDD.mapPartitionsWithIndex { (indx, iter) =>
   val rand = new scala.util.Random(indx+myAppSeed)
   iter.map(x => (x, Array.fill(10)(rand.nextDouble)))
}

1
曾经需要维护一个使用了这种解决方案的代码,并希望与社区分享这个解决方案存在缺陷,可能会严重影响您的统计分析,请注意。当您有一个分区>1的rdd时,每个分区的随机数序列将以新的种子和不同的数字重新开始,但它可能会改变整个序列的“特征”。我的建议是:不要使用这种方法。 - d-xa
@d-xa感谢您的评论。您能推荐一种替代方法吗? - leo9r
如果您使用这种方法,我建议将myRDD的分区设置为1。 - d-xa

5
相同的序列被重复的原因是,在数据被分割之前,随机发生器会被创建并初始化一个种子。然后每个分区从相同的随机种子开始。这可能不是最有效的方法,但以下内容应该可以实现:
val myClass = new MyClass()
val M = 3

for (m <- 1 to M) {
  val newDF = sqlContext.createDataFrame(myDF
    .map{ 
       val rand = scala.util.Random
       row => RowFactory
      .create(row.getString(0),
        myClass.myMethod(row.getString(2), rand.nextDouble())
    }, myDF.schema)
}

我稍微修改了一下以解决我的问题。我将随机值传递到我的方法中,并从那里生成随机数。这解决了我的问题,但由于可序列化的原因,我不得不使用 java.util.Random - Brian

1
使用Spark Dataset API,可能用于累加器:
df.withColumn("_n", substring(rand(),3,4).cast("bigint"))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接