我有一个从csv文件读取的Spark DataFrame,读取方式如下:
df = ss.read \
.format("csv") \
.option("delimiter", ";") \
.option("header", "false") \
.option("inferSchema", "true") \
.option("escape", "\"") \
.option("multiline", "true") \
.option("wholeFile", "true") \
.load(file_path)
数据框的格式如下:
|cod_cli|article_name|rank|
|123 |art_1 |1 |
|123 |art_2 |2 |
|123 |art_3 |3 |
|456 |art_4 |1 |
|456 |art_5 |2 |
|456 |art_6 |3 |
我希望按照 cod_cli 列对元素进行分组,并创建多个列,每个列对应分组集合中的一个产品,以字典键值对的形式表示,其中键为列名,值为与该列名相关联的数值,形式如下:
|cod_cli|Product 1 |Product 2 |Product 3 |
|123 |{cod_art : art_1, rank : 1}|{cod_art : art_2, rank : 2}|{cod_art : art_3, rank : 3}|
|456 |{cod_art : art_4, rank : 1}|{cod_art : art_5, rank : 2}|{cod_art : art_6, rank : 3}|
字典的值可以是字符串(更好)或地图。
我尝试了这种方式:
df = df \
.groupBy(F.col("cod_cli")) \
.agg(F.collect_list(F.array("cod_art","rank")))
但是这样做,我会创建一个具有所有分组元素的数组列的列。
请问是否有人可以帮助我?
谢谢
更新
提出的解决方案如下:
df = df.withColumn(
"Product",
F.to_json(
F.struct(F.col("cod_art"), F.col("rank"))
)
)
我这样创建了一个名为“Product”的列,并使用所需的json字符串示例创建了它,例如{cod_art: art_1, rank: 1}
。
然后:
df = df \
.groupBy(F.col("cod_cli")) \
.pivot("rank") \
.agg(F.first("Product"))
通过这种方式,我可以为每个产品创建一列,按照cod_cli属性进行分组,并处理有多于3个产品的列的情况:
|cod_cli|1 |2 |3
|123 |{cod_art : art_1, rank : 1}|{cod_art : art_2, rank : 2}|{cod_art : art_3, rank : 3}|
|456 |{cod_art : art_4, rank : 1}|{cod_art : art_5, rank : 2}|{cod_art : art_6, rank : 3}|