在Pyspark Dataframe上将字符串列进行透视

48

我有一个像这样简单的数据框:

rdd = sc.parallelize(
    [
        (0, "A", 223,"201603", "PORT"), 
        (0, "A", 22,"201602", "PORT"), 
        (0, "A", 422,"201601", "DOCK"), 
        (1,"B", 3213,"201602", "DOCK"), 
        (1,"B", 3213,"201601", "PORT"), 
        (2,"C", 2321,"201601", "DOCK")
    ]
)
df_data = sqlContext.createDataFrame(rdd, ["id","type", "cost", "date", "ship"])

df_data.show()
 +---+----+----+------+----+
| id|type|cost|  date|ship|
+---+----+----+------+----+
|  0|   A| 223|201603|PORT|
|  0|   A|  22|201602|PORT|
|  0|   A| 422|201601|DOCK|
|  1|   B|3213|201602|DOCK|
|  1|   B|3213|201601|PORT|
|  2|   C|2321|201601|DOCK|
+---+----+----+------+----+

我需要按日期进行数据透视:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("cost").show()

+---+----+------+------+------+
| id|type|201601|201602|201603|
+---+----+------+------+------+
|  2|   C|2321.0|  null|  null|
|  0|   A| 422.0|  22.0| 223.0|
|  1|   B|3213.0|3213.0|  null|
+---+----+------+------+------+

一切都按预期进行。但现在我需要对其进行透视并获取一个非数值列:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("ship").show()

当然,我会得到一个异常:

AnalysisException: u'"ship" is not a numeric column. Aggregation function can only be applied on a numeric column.;'

我想生成类似以下的内容:

+---+----+------+------+------+
| id|type|201601|201602|201603|
+---+----+------+------+------+
|  2|   C|DOCK  |  null|  null|
|  0|   A| DOCK |  PORT| DOCK|
|  1|   B|DOCK  |PORT  |  null|
+---+----+------+------+------+

使用 pivot 可以实现吗?

2个回答

80
假设 (id | type | date) 组合是唯一的,且您的目标仅为数据透视而非聚合,则可以使用first函数(或任何其他不受限于数字值的函数):
from pyspark.sql.functions import first

(df_data
    .groupby(df_data.id, df_data.type)
    .pivot("date")
    .agg(first("ship"))
    .show())

## +---+----+------+------+------+
## | id|type|201601|201602|201603|
## +---+----+------+------+------+
## |  2|   C|  DOCK|  null|  null|
## |  0|   A|  DOCK|  PORT|  PORT|
## |  1|   B|  PORT|  DOCK|  null|
## +---+----+------+------+------+

如果这些假设不正确,您将需要预聚合您的数据。例如,对于最常见的 ship 值:

from pyspark.sql.functions import max, struct

(df_data
    .groupby("id", "type", "date", "ship")
    .count()
    .groupby("id", "type")
    .pivot("date")
    .agg(max(struct("count", "ship")))
    .show())

## +---+----+--------+--------+--------+
## | id|type|  201601|  201602|  201603|
## +---+----+--------+--------+--------+
## |  2|   C|[1,DOCK]|    null|    null|
## |  0|   A|[1,DOCK]|[1,PORT]|[1,PORT]|
## |  1|   B|[1,PORT]|[1,DOCK]|    null|
## +---+----+--------+--------+--------+

3
另一个解决方案是使用collect_set来保留所有的ship值。 - Jacek Laskowski
@Jacek,你能在这里给出解决方案吗? - stack0114106
@stack0114106 将上述代码中的 max(struct 替换为 collect_set,然后你就完成了。不过我正在寻找一个机会将其作为一个完整的答案使用。你知道有哪些问题需要这样的回答吗?;-) - Jacek Laskowski

3

如果有人正在寻找SQL风格的方法。

rdd = spark.sparkContext.parallelize(
    [
        (0, "A", 223,"201603", "PORT"), 
        (0, "A", 22,"201602", "PORT"), 
        (0, "A", 422,"201601", "DOCK"), 
        (1,"B", 3213,"201602", "DOCK"), 
        (1,"B", 3213,"201601", "PORT"), 
        (2,"C", 2321,"201601", "DOCK")
    ]
)
df_data = spark.createDataFrame(rdd, ["id","type", "cost", "date", "ship"])
df_data.createOrReplaceTempView("df")
df_data.show()

dt_vals=spark.sql("select collect_set(date) from df").collect()[0][0]
['201601', '201602', '201603']

dt_vals_colstr=",".join(["'" + c + "'" for c in sorted(dt_vals)])
"'201601','201602','201603'"

第一部分(注意 f 格式说明符)

spark.sql(f"""
select * from 
(select id , type, date, ship from df)
pivot (
first(ship) for date in ({dt_vals_colstr})
)
""").show(100,truncate=False)

+---+----+------+------+------+
|id |type|201601|201602|201603|
+---+----+------+------+------+
|1  |B   |PORT  |DOCK  |null  |
|2  |C   |DOCK  |null  |null  |
|0  |A   |DOCK  |PORT  |PORT  |
+---+----+------+------+------+

Part-2

spark.sql(f"""
select * from 
(select id , type, date, ship from df)
pivot (
case when count(*)=0 then null 
else struct(count(*),first(ship)) end for date in ({dt_vals_colstr})
)
""").show(100,truncate=False)

+---+----+---------+---------+---------+
|id |type|201601   |201602   |201603   |
+---+----+---------+---------+---------+
|1  |B   |[1, PORT]|[1, DOCK]|null     |
|2  |C   |[1, DOCK]|null     |null     |
|0  |A   |[1, DOCK]|[1, PORT]|[1, PORT]|
+---+----+---------+---------+---------+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接