如何编写一个返回字典类型的Python UDF函数

4

我正在使用pyspark。我有一个如下格式的spark数据框

| person_id | person_attributes
____________________________________________________________________________
| id_1    "department=Sales__title=Sales_executive__level=junior"
| id_2    "department=Engineering__title=Software Engineer__level=entry-level" 

我已经编写了一个Python函数,该函数接收person_id和person_attributes,并返回以下格式的JSON:{"id_1":{"properties":[{"department":"销售部"},{"title":"销售主管"},{}]}} 但是我不知道如何在pyspark中将其注册为udf并使用适当的输出类型。以下是Python代码:
def create_json_from_string(pid,attribute_string):
    results = []
    attribute_map ={}
    output = {}

    # Split the attribute_string into key,value pair and store it in attribute map
    if attribute_string != '':
        attribute_string = attribute_string.split("__") # This will be a list 
        for substring in attribute_string:
            k,v = substring.split("=")
            attribute_map[str(k)] = str(v)

    for k,v in attribute_map.items():
        temp = {k:v}
        results.append(temp)

    output ={pid : {"properties": results }}
    return(output)

你期望的输出是什么? - Shubham Jain
每一行都对应一个包含{"id_1":{"properties":[{"department":'Sales'},{"title":'Sales_executive'},{}]}}的元素的列。然后将该列写成JSON格式。 - NG_21
2个回答

4
你需要修改函数,仅返回字符串的映射而不是完整结构。之后,函数可以应用于单个列而不是整行数据。像这样: ```python def map_string(text): # your code here to map the string return mapped_text
df['new_column'] = df['old_column'].apply(map_string) ```
from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import col

def struct_from_string(attribute_string):
    attribute_map ={}
    if attribute_string != '':
        attribute_string = attribute_string.split("__") # This will be a list 
        for substring in attribute_string:
            k,v = substring.split("=")
            attribute_map[str(k)] = str(v)
    return attribute_map

my_parse_string_udf = spark.udf.register("my_parse_string", struct_from_string, 
     MapType(StringType(), StringType()))

接着,它可以按照以下方式使用:

df2 = df.select(col("person_id"), my_parse_string_udf(col("person_attributes")))

不注册UDF,我只是简单地使用了my_parse_string_udf = F.udf(struct_from_string, returnType=MapType(StringType(), StringType()))。注册UDF有什么好处呢? - undefined
好处是你也可以从SQL中使用它。 - undefined

1
在Spark中,UDF被视为黑匣子,如果您想要基于Dataframe API的解决方案。

Spark 2.4+

创建Dataframe
df=spark.createDataFrame([('id_1',"department=Sales__title=Sales_executive__level=junior"),('id_2',"department=Engineering__title=Software Engineer__level=entry-level")],['person_id','person_attributes'])

df.show()
+---------+--------------------+
|person_id|   person_attributes|
+---------+--------------------+
|     id_1|department=Sales_...|
|     id_2|department=Engine...|
+---------+--------------------+

将以map格式表示的person_attributes转换。
df2 = df.select('person_id',f.map_from_arrays(f.expr('''transform(transform(split(person_attributes,'__'),x->split(x,'=')),y->y[0])'''),
         f.expr('''transform(transform(split(person_attributes,'__'),x->split(x,'=')),y->y[1])''')).alias('value'))

df2.show(2,False)

+---------+-----------------------------------------------------------------------------+
|person_id|value                                                                        |
+---------+-----------------------------------------------------------------------------+
|id_1     |[department -> Sales, title -> Sales_executive, level -> junior]             |
|id_2     |[department -> Engineering, title -> Software Engineer, level -> entry-level]|
+---------+-----------------------------------------------------------------------------+

创建您所需的结构。
df2.select(f.create_map('person_id',f.create_map(f.lit('properties'),'value')).alias('json')).toJSON().collect()

['{"json":{"id_1":{"properties":{"department":"Sales","title":"Sales_executive","level":"junior"}}}}',
 '{"json":{"id_2":{"properties":{"department":"Engineering","title":"Software Engineer","level":"entry-level"}}}}']


您可以直接收集或使用数据框,如果要收集,请使用此选项。
import json
for i in data:
    d = json.loads(i)
    print(d['json'])

{'id_1': {'properties': {'department': 'Sales', 'title': 'Sales_executive', 'level': 'junior'}}}
{'id_2': {'properties': {'department': 'Engineering', 'title': 'Software Engineer', 'level': 'entry-level'}}}

1
谢谢你的方法。我使用的集群/Spark没有这个collect_map_from_arrays函数。但是你所做的对我在其他情况下也有帮助。 - NG_21

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接