Spark Dataframe每次转换/操作后,随机UUID会发生变化。

14

我有一个Spark数据帧,其中包含一个生成的UUID列。然而,每次我对数据帧进行操作或转换时,UUID都会在每个阶段发生更改。

如何只生成一次UUID,并在此后使UUID保持静态。

以下是一些可重现我的问题的示例代码:

def process(spark: SparkSession): Unit = {

  import spark.implicits._

  val sc = spark.sparkContext
  val sqlContext = spark.sqlContext
  sc.setLogLevel("OFF")

  // create dataframe
  val df = spark.createDataset(Array(("a", "1"), ("b", "2"), ("c", "3"))).toDF("col1", "col2")
  df.createOrReplaceTempView("df")
  df.show(false)

  // register an UDF that creates a random UUID
  val generateUUID = udf(() => UUID.randomUUID().toString)

  // generate UUID for new column
  val dfWithUuid = df.withColumn("new_uuid", generateUUID())
  dfWithUuid.show(false)
  dfWithUuid.show(false)    // uuid is different

  // new transformations also change the uuid
  val dfWithUuidWithNewCol = dfWithUuid.withColumn("col3", df.col("col2")+1)
  dfWithUuidWithNewCol.show(false)
}

输出结果为:

+----+----+
|col1|col2|
+----+----+
|a   |1   |
|b   |2   |
|c   |3   |
+----+----+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |a414e73b-24b8-4f64-8d21-f0bc56d3d290|
|b   |2   |f37935e5-0bfc-4863-b6dc-897662307e0a|
|c   |3   |e3aaf655-5a48-45fb-8ab5-22f78cdeaf26|
+----+----+------------------------------------+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |1c6597bf-f257-4e5f-be81-34a0efa0f6be|
|b   |2   |6efe4453-29a8-4b7f-9fa1-7982d2670bd6|
|c   |3   |2f7ddc1c-3e8c-4118-8e2c-8a6f526bee7e|
+----+----+------------------------------------+

+----+----+------------------------------------+----+
|col1|col2|new_uuid                            |col3|
+----+----+------------------------------------+----+
|a   |1   |00b85af8-711e-4b59-82e1-8d8e59d4c512|2.0 |
|b   |2   |94c3f2c6-9234-4fb3-b1c4-273a37171131|3.0 |
|c   |3   |1059fff2-b8f9-4cec-907d-ea181d5003a2|4.0 |
+----+----+------------------------------------+----+

请注意,每个步骤中UUID都是不同的。

4个回答

14
这是一种预期行为。用户定义的函数必须是确定性的

用户定义的函数必须是确定性的。由于优化,重复调用可能会被消除,或者该函数甚至可能比查询中存在的次数更多地被调用。

如果您想包含不确定性函数并保留输出结果,您应该将中间数据写入持久存储并读取回来。在某些简单情况下,可以使用检查点或缓存,但一般情况下它不能可靠。

如果上游过程是确定性的(首先有随机化),您可以尝试使用rand函数和种子,转换为字节数组,并传递给UUID.nameUUIDFromBytes

另请参阅:有关如何在Scala中使用随机值向现有DataFrame添加新列的说明

注意SPARK-20586引入了deterministic标志,可以禁用某些优化,但不清楚当数据被persisted并且执行器出现故障时它的行为如何。


这个问题只出现在UDF中吗?还是如果我们在map函数中添加列也会出现这个问题? - abalcerek
@abalcerek,我在map函数中遇到了问题(当进行2个“collect”时)。 - Yehezkel
2
我可以确认,当应用于使用UUID.randomUUID()生成随机UUID的UDF时,非确定性标志不会按照广告所述的行为进行。 该UDF仍可能被多次评估。 这可能与SPARK-23599有关。 - Nicus

6

虽然这是一个很老的问题,但我想让大家知道对我有用的方法。它可能会帮助某些人。

你可以使用以下方式使用expr函数来生成唯一的GUID,而且在转换中不会改变。

import org.apache.spark.sql.functions._  
// create dataframe  
val df = spark.createDataset(Array(("a", "1"), ("b", "2"), ("c", "3"))).toDF("col1", "col2")   
df.createOrReplaceTempView("df")   
df.show(false)

// generate UUID for new column   
val dfWithUuid = df.withColumn("new_uuid", expr("uuid()"))
dfWithUuid.show(false)
dfWithUuid.show(false)    

// new transformations 
val dfWithUuidWithNewCol = dfWithUuid.withColumn("col3", df.col("col2")+1)
dfWithUuidWithNewCol.show(false)

以下是输出结果:
+----+----+
|col1|col2|
+----+----+
|a   |1   |
|b   |2   |
|c   |3   |
+----+----+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|
|b   |2   |43882a79-8e7f-4002-9740-f22bc6b20db5|
|c   |3   |64bc741a-0d7c-430d-bfe2-a4838f10acd0|
+----+----+------------------------------------+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|
|b   |2   |43882a79-8e7f-4002-9740-f22bc6b20db5|
|c   |3   |64bc741a-0d7c-430d-bfe2-a4838f10acd0|
+----+----+------------------------------------+

+----+----+------------------------------------+----+
|col1|col2|new_uuid                            |col3|
+----+----+------------------------------------+----+
|a   |1   |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|2.0 |
|b   |2   |43882a79-8e7f-4002-9740-f22bc6b20db5|3.0 |
|c   |3   |64bc741a-0d7c-430d-bfe2-a4838f10acd0|4.0 |
+----+----+------------------------------------+----+

它抛出了异常 错误:未找到值表达式 - Manish Jain
1
我认为你的代码顶部没有导入语句。import org.apache.spark.sql.functions._ - Nikunj Kakadiya

2

我有一个 PySpark 版本:

from pyspark.sql import functions as f

pdataDF=dataDF.withColumn("uuid_column",f.expr("uuid()"))
display(pdataDF)
pdataDF.write.mode("overwrite").saveAsTable("tempUuidCheck")

-1
试一下这个:
df.withColumn("XXXID", lit(java.util.UUID.randomUUID().toString))

它的工作方式与以下不同:

val generateUUID = udf(() => java.util.UUID.randomUUID().toString)
df.withColumn("XXXCID", generateUUID() )

我希望这有所帮助。

Pawel


3
所有数据将使用相同的UUID。 - Artur Sukhenko

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接