考虑以下数据框:
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[john, sam, jane] |
|pet |[whiskers, rover, fido]|
+------+-----------------------+
可以使用以下代码创建:
import pyspark.sql.functions as f
data = [
('person', ['john', 'sam', 'jane']),
('pet', ['whiskers', 'rover', 'fido'])
]
df = sqlCtx.createDataFrame(data, ["type", "names"])
df.show(truncate=False)
有没有一种方法可以直接修改 ArrayType()
列的 "names"
字段,而不使用 udf
,并对每个元素应用一个函数?
例如,假设我想将函数 foo
应用于 "names"
列。 (我将使用函数 str.upper
的示例只是为了说明目的,但我的问题涉及可应用于可迭代对象元素的任何有效函数。)
foo = lambda x: x.upper() # defining it as str.upper as an example
df.withColumn('X', [foo(x) for x in f.col("names")]).show()
TypeError: 列不可迭代
我可以使用udf
来完成这个任务:
foo_udf = f.udf(lambda row: [foo(x) for x in row], ArrayType(StringType()))
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
#+------+-----------------------+
#|type |names |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE] |
#|pet |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+
在这个具体的例子中,我可以通过拆分列、调用 pyspark.sql.functions.upper()
然后执行 groupBy
和 collect_list
来避免使用 udf
:
df.select('type', f.explode('names').alias('name'))\
.withColumn('name', f.upper(f.col('name')))\
.groupBy('type')\
.agg(f.collect_list('name').alias('names'))\
.show(truncate=False)
#+------+-----------------------+
#|type |names |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE] |
#|pet |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+
但是这需要很多代码才能完成一些简单的任务。是否有更直接的方法来使用spark-dataframe函数迭代ArrayType()
元素?