我正在使用pyspark。我有一个如下格式的spark数据框
| person_id | person_attributes
____________________________________________________________________________
| id_1 "department=Sales__title=Sales_executive__level=junior"
| id_2 "department=Engineering__title=Software Engineer__level=entry-level"
我已经编写了一个Python函数,该函数接收person_id和person_attributes,并返回以下格式的JSON:
{"id_1":{"properties":[{"department":"销售部"},{"title":"销售主管"},{}]}}
但是我不知道如何在pyspark
中将其注册为udf
并使用适当的输出类型。以下是Python代码:def create_json_from_string(pid,attribute_string):
results = []
attribute_map ={}
output = {}
# Split the attribute_string into key,value pair and store it in attribute map
if attribute_string != '':
attribute_string = attribute_string.split("__") # This will be a list
for substring in attribute_string:
k,v = substring.split("=")
attribute_map[str(k)] = str(v)
for k,v in attribute_map.items():
temp = {k:v}
results.append(temp)
output ={pid : {"properties": results }}
return(output)
{"id_1":{"properties":[{"department":'Sales'},{"title":'Sales_executive'},{}]}}
的元素的列。然后将该列写成JSON格式。 - NG_21