自动增量的pyspark数据框列值

4
我将尝试在数据框中生成一个附加列,其中包含基于全局值的自动递增值。然而,所有行都生成相同的值,并且该值未递增。
以下是代码:
def autoIncrement():
    global rec
    if (rec == 0) : rec = 1 
    else : rec = rec + 1
    return int(rec)

rec=14

UDF

autoIncrementUDF = udf(autoIncrement,  IntegerType())


df1 = hiveContext.sql("select id,name,location,state,datetime,zipcode from demo.target")

df1.withColumn("id2", autoIncrementUDF()).show()

这是结果 df

+---+------+--------+----------+-------------------+-------+---+
| id|  name|location|     state|           datetime|zipcode|id2|
+---+------+--------+----------+-------------------+-------+---+
| 20|pankaj| Chennai| TamilNadu|2018-03-26 11:00:00|   NULL| 15|
| 10|geetha| Newyork|New Jersey|2018-03-27 10:00:00|   NULL| 15|
| 25| pawan| Chennai| TamilNadu|2018-03-27 11:25:00|   NULL| 15|
| 30|Manish| Gurgoan|   Gujarat|2018-03-27 11:00:00|   NULL| 15|
+---+------+--------+----------+-------------------+-------+---+

但我期望以下结果。
+---+------+--------+----------+-------------------+-------+---+
| id|  name|location|     state|           datetime|zipcode|id2|
+---+------+--------+----------+-------------------+-------+---+
| 20|pankaj| Chennai| TamilNadu|2018-03-26 11:00:00|   NULL| 15|
| 10|geetha| Newyork|New Jersey|2018-03-27 10:00:00|   NULL| 16|
| 25| pawan| Chennai| TamilNadu|2018-03-27 11:25:00|   NULL| 17|
| 30|Manish| Gurgoan|   Gujarat|2018-03-27 11:00:00|   NULL| 18|
+---+------+--------+----------+-------------------+-------+---+

非常感谢您的帮助。


由于UDF可以在不同的工作进程中执行,因此Python的global变量没有意义,因为全局变量绑定到进程。 - Susensio
1个回答

5
全局变量绑定到Python进程上。一个UDF可以在某个集群中的不同worker上并行执行,并且应该是确定性的。
你应该使用pyspark.sql.functions模块中的monotonically_increasing_id()函数。
有关详细信息,请查看文档
你需要小心,因为这个函数是动态的而不是粘性的: 如何向Spark DataFrame添加一个持久化行ID列?

2
你的第一个链接已经失效。 - Josh Herzberg

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接