将pyspark.sql.dataframe.DataFrame类型的数据框转换为字典

19

我有一个pyspark数据框,需要将其转换为Python字典。

下面的代码是可复制的:

from pyspark.sql import Row
rdd = sc.parallelize([Row(name='Alice', age=5, height=80),Row(name='Alice', age=5, height=80),Row(name='Alice', age=10, height=80)])
df = rdd.toDF()

一旦我获得了这个数据框,我需要将其转换为字典。

我尝试过像这样:

df.set_index('name').to_dict()

但是它会出错。我该如何实现这个需求?

5个回答

35
请看下面的示例:
>>> from pyspark.sql.functions import col
>>> df = (sc.textFile('data.txt')
            .map(lambda line: line.split(","))
            .toDF(['name','age','height'])
            .select(col('name'), col('age').cast('int'), col('height').cast('int')))

+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
|  Bob|  5|    80|
|Alice| 10|    80|
+-----+---+------+

>>> list_persons = map(lambda row: row.asDict(), df.collect())
>>> list_persons
[
    {'age': 5, 'name': u'Alice', 'height': 80}, 
    {'age': 5, 'name': u'Bob', 'height': 80}, 
    {'age': 10, 'name': u'Alice', 'height': 80}
]

>>> dict_persons = {person['name']: person for person in list_persons}
>>> dict_persons
{u'Bob': {'age': 5, 'name': u'Bob', 'height': 80}, u'Alice': {'age': 10, 'name': u'Alice', 'height': 80}}

我用来测试data.txt的输入:

Alice,5,80
Bob,5,80
Alice,10,80

首先,我们使用pyspark通过读取行来进行加载。然后,我们通过逗号分隔符将行转换为列。接着,我们将native RDD转换为DF并将名称添加到列中。最后,我们将列转换为适当的格式。

然后,我们将所有内容收集到driver,并使用一些python列表推导式将数据转换为所需的形式。我们使用asDict()方法将Row对象转换为字典。在输出中,我们可以观察到Alice仅出现了一次,但这当然是因为Alice的键被覆盖了。

请注意,在将结果返回给driver之前,要在pypspark内完成所有处理和过滤。

希望这能有所帮助,谢谢。


2
嗨Fokko,list_persons的打印结果对我来说是"<map object at 0x7f09000baf28>"。能帮忙吗? - Taka
3
将列表包装在映射周围,即list_persons = list(map(lambda row: row.asDict(), df.collect()))。 - Naufal
太棒了,这是一个极好的方式! - Mike Williamson

26

您需要先使用toPandas()将其转换为pandas.DataFrame,然后可以在转置的数据帧上使用to_dict()方法,并带有orient ='list'参数:

df.toPandas().set_index('name').T.to_dict('list')
# Out[1]: {u'Alice': [10, 80]}

但是你的输出不正确,对吧?我想要的输出应该是这样的 {name: [age, height]} - Hardik Gupta
{Alice: [5,80]} 的输出应为:{爱丽丝: [5,80]},不含‘u’。 - Hardik Gupta
8
我不建议在这里使用Pandas。Pandas是一个庞大的依赖库,对于这样一个简单的操作来说,它并非必需品。 - Fokko Driesprong
3
为什么应该在问题中分享期望的输出,并且为什么年龄是5而不是10?如果你想创建一个字典,你应该拥有唯一的记录。 - mtoto

5
RDDs内置了函数asDict(),可以将每一行表示为一个字典。
如果您有一个dataframe df,则需要将其转换为rdd并应用asDict()。
new_rdd = df.rdd.map(lambda row: row.asDict(True))

接下来可以使用新的rdd执行常规的python map操作,例如:

# You can define normal python functions like below and plug them when needed
def transform(row):
    # Add a new key to each row
    row["new_key"] = "my_new_value"
    return row

new_rdd = new_rdd.map(lambda row: transform(row))

3

一种简单的方法是收集行RDD并使用字典推导式迭代它。在这里,我将尝试演示类似的内容:

假设有一个电影数据框:

movie_df

movieId avg_rating
1 3.92
10 3.5
100 2.79
100044 4.0
100068 3.5
100083 3.5
100106 3.5
100159 4.5
100163 2.9
100194 4.5
我们可以使用字典推导式并像下面这样迭代行RDD:
movie_dict = {int(row.asDict()['movieId']) : row.asDict()['avg_rating'] for row in movie_avg_rating.collect()}
print(movie_dict)
{1: 3.92,
 10: 3.5,
 100: 2.79,
 100044: 4.0,
 100068: 3.5,
 100083: 3.5,
 100106: 3.5,
 100159: 4.5,
 100163: 2.9,
 100194: 4.5}

在非参数化脚本中,row.movieId: row.avg_rating 会更清晰。 - Davide

0
这是一个适用于非常简单情况的两行代码。更多灵活的情况可能会使用lambda函数和asDict来生成值。
假设DataFrame df包含keyval字符串,它们表示列名。
list = df.select(key, val).collect()
dict = {row[key]: row[val] for row in list }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接