PySpark dataframe to_json() 函数

9

我有一个如下的数据框:

>>> df.show(10,False)
+-----+----+---+------+
|id   |name|age|salary|
+-----+----+---+------+
|10001|alex|30 |75000 |
|10002|bob |31 |80000 |
|10003|deb |31 |80000 |
|10004|john|33 |85000 |
|10005|sam |30 |75000 |
+-----+----+---+------+

将df的整行转换为一个新列"jsonCol",
>>> newDf1 = df.withColumn("jsonCol", to_json(struct([df[x] for x in df.columns])))
>>> newDf1.show(10,False)
+-----+----+---+------+--------------------------------------------------------+
|id   |name|age|salary|jsonCol                                                 |
+-----+----+---+------+--------------------------------------------------------+
|10001|alex|30 |75000 |{"id":"10001","name":"alex","age":"30","salary":"75000"}|
|10002|bob |31 |80000 |{"id":"10002","name":"bob","age":"31","salary":"80000"} |
|10003|deb |31 |80000 |{"id":"10003","name":"deb","age":"31","salary":"80000"} |
|10004|john|33 |85000 |{"id":"10004","name":"john","age":"33","salary":"85000"}|
|10005|sam |30 |75000 |{"id":"10005","name":"sam","age":"30","salary":"75000"} |
+-----+----+---+------+--------------------------------------------------------+

在上一步中,我需要一种方法仅基于字段的值选择少量列,而不是将整行转换为JSON字符串。在下面的命令中,我提供了一个示例条件。

但是, 当我开始使用when函数时,生成的JSON字符串的列名(键)消失了。只能通过它们的位置获取列名,而不是实际的列名(键)。

>>> newDf2 = df.withColumn("jsonCol", to_json(struct([ when(col(x)!="  ",df[x]).otherwise(None) for x in df.columns])))
>>> newDf2.show(10,False)
+-----+----+---+------+---------------------------------------------------------+
|id   |name|age|salary|jsonCol                                                  |
+-----+----+---+------+---------------------------------------------------------+
|10001|alex|30 |75000 |{"col1":"10001","col2":"alex","col3":"30","col4":"75000"}|
|10002|bob |31 |80000 |{"col1":"10002","col2":"bob","col3":"31","col4":"80000"} |
|10003|deb |31 |80000 |{"col1":"10003","col2":"deb","col3":"31","col4":"80000"} |
|10004|john|33 |85000 |{"col1":"10004","col2":"john","col3":"33","col4":"85000"}|
|10005|sam |30 |75000 |{"col1":"10005","col2":"sam","col3":"30","col4":"75000"} |
+-----+----+---+------+---------------------------------------------------------+

我需要使用when函数,但要以类似于newDf1的实际列名(键)的结果。有人可以帮我吗?

1个回答

9
您在struct函数内使用了条件作为列,条件列被重命名为col1、col2等,因此您需要使用alias来更改这些名称。
from pyspark.sql import functions as F
newDf2 = df.withColumn("jsonCol", F.to_json(F.struct([F.when(F.col(x)!="  ",df[x]).otherwise(None).alias(x) for x in df.columns])))
newDf2.show(truncate=False)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接