这里提供了一种通用解决方案,不需要事先知道数组长度、使用collect
或使用udf
。不幸的是,这只适用于spark
2.1及以上版本,因为它要求使用posexplode
函数。
假设您有以下DataFrame:
df = spark.createDataFrame(
[
[1, 'A, B, C, D'],
[2, 'E, F, G'],
[3, 'H, I'],
[4, 'J']
]
, ["num", "letters"]
)
df.show()
将 letters
列拆分,然后使用 posexplode
将结果数组与数组中的位置一起展开。接下来使用 pyspark.sql.functions.expr
获取该数组中索引为pos
的元素。
import pyspark.sql.functions as f
df.select(
"num",
f.split("letters", ", ").alias("letters"),
f.posexplode(f.split("letters", ", ")).alias("pos", "val")
)\
.show()
现在我们从这个结果创建两个新列。第一个是我们新列的名称,它将是letter
和数组中的索引的连接。第二列将是数组中相应索引处的值。我们通过利用pyspark.sql.functions.expr
的功能来获得后者,该功能允许我们使用列值作为参数。
df.select(
"num",
f.split("letters", ", ").alias("letters"),
f.posexplode(f.split("letters", ", ")).alias("pos", "val")
)\
.drop("val")\
.select(
"num",
f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
f.expr("letters[pos]").alias("val")
)\
.show()
现在我们可以通过对num
进行groupBy
并对DataFrame进行pivot
来完成。将所有内容组合起来,我们得到:
df.select(
"num",
f.split("letters", ", ").alias("letters"),
f.posexplode(f.split("letters", ", ")).alias("pos", "val")
)\
.drop("val")\
.select(
"num",
f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
f.expr("letters[pos]").alias("val")
)\
.groupBy("num").pivot("name").agg(f.first("val"))\
.show()
split_col.getItem (2-n)
。我猜想,像上面的循环为所有项目制作列,然后连接它们可能有效,但我不知道这是否非常高效。 - Chris