使用Pyspark将多个列合并为一个JSON列

9

我之前曾经问过一个关于Python的问题,现在我需要在PySpark中完成同样的事情。

我有一个类似这样的数据框(df):

|cust_id|address    |store_id|email        |sales_channel|category|
-------------------------------------------------------------------
|1234567|123 Main St|10SjtT  |idk@gmail.com|ecom         |direct  |
|4567345|345 Main St|10SjtT  |101@gmail.com|instore      |direct  |
|1569457|876 Main St|51FstT  |404@gmail.com|ecom         |direct  |

我希望将最后四个字段合并成一个元数据字段,格式为json:

|cust_id|address    |metadata                                                                                     |
-------------------------------------------------------------------------------------------------------------------
|1234567|123 Main St|{'store_id':'10SjtT', 'email':'idk@gmail.com','sales_channel':'ecom', 'category':'direct'}   |
|4567345|345 Main St|{'store_id':'10SjtT', 'email':'101@gmail.com','sales_channel':'instore', 'category':'direct'}|
|1569457|876 Main St|{'store_id':'51FstT', 'email':'404@gmail.com','sales_channel':'ecom', 'category':'direct'}   |

这是我在Python中使用的代码:

cols = [
    'store_id',
    'store_category',
    'sales_channel',
    'email'
]

df1 = df.copy()
df1['metadata'] = df1[cols].to_dict(orient='records')
df1 = df1.drop(columns=cols)

但我希望将其翻译成PySpark代码以使用Spark数据框架;我不想在Spark中使用pandas。

2个回答

29

使用to_json函数创建 JSON 对象!

例子:

from pyspark.sql.functions import *

#sample data
df=spark.createDataFrame([('1234567','123 Main St','10SjtT','idk@gmail.com','ecom','direct')],['cust_id','address','store_id','email','sales_channel','category'])

df.select("cust_id","address",to_json(struct("store_id","category","sales_channel","email")).alias("metadata")).show(10,False)

#result
+-------+-----------+----------------------------------------------------------------------------------------+
|cust_id|address    |metadata                                                                                |
+-------+-----------+----------------------------------------------------------------------------------------+
|1234567|123 Main St|{"store_id":"10SjtT","category":"direct","sales_channel":"ecom","email":"idk@gmail.com"}|
+-------+-----------+----------------------------------------------------------------------------------------+

通过传递列的列表来进行 to_json 转换:


ll=['store_id','email','sales_channel','category']

df.withColumn("metadata", to_json(struct([x for x in ll]))).drop(*ll).show()

#result
+-------+-----------+----------------------------------------------------------------------------------------+
|cust_id|address    |metadata                                                                                |
+-------+-----------+----------------------------------------------------------------------------------------+
|1234567|123 Main St|{"store_id":"10SjtT","email":"idk@gmail.com","sales_channel":"ecom","category":"direct"}|
+-------+-----------+----------------------------------------------------------------------------------------+

能否传入列的列表而不是它们的实际列名? - DBA108642
3
最好的答案!我要从Kafka -> Spark -> Kafka,这个答案正是我需要的。 - Brian Wylie

2

@Shu提供了一个很好的答案,这里有一个对我的使用情况稍微更好的变体。我正在从Kafka -> Spark -> Kafka,这个一行代码正好做到了我想要的。 struct(*)会将dataframe中的所有字段打包起来。

# Packup the fields in preparation for sending to Kafka sink
kafka_df = df.selectExpr('cast(id as string) as key', 'to_json(struct(*)) as value')

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接