请看下面的示例:
>>> from pyspark.sql.functions import col
>>> df = (sc.textFile('data.txt')
.map(lambda line: line.split(","))
.toDF(['name','age','height'])
.select(col('name'), col('age').cast('int'), col('height').cast('int')))
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice| 5| 80|
| Bob| 5| 80|
|Alice| 10| 80|
+-----+---+------+
>>> list_persons = map(lambda row: row.asDict(), df.collect())
>>> list_persons
[
{'age': 5, 'name': u'Alice', 'height': 80},
{'age': 5, 'name': u'Bob', 'height': 80},
{'age': 10, 'name': u'Alice', 'height': 80}
]
>>> dict_persons = {person['name']: person for person in list_persons}
>>> dict_persons
{u'Bob': {'age': 5, 'name': u'Bob', 'height': 80}, u'Alice': {'age': 10, 'name': u'Alice', 'height': 80}}
我用来测试data.txt
的输入:
Alice,5,80
Bob,5,80
Alice,10,80
首先,我们使用pyspark通过读取行来进行加载。然后,我们通过逗号分隔符将行转换为列。接着,我们将native RDD转换为DF并将名称添加到列中。最后,我们将列转换为适当的格式。
然后,我们将所有内容收集到driver,并使用一些python列表推导式将数据转换为所需的形式。我们使用asDict()
方法将Row
对象转换为字典。在输出中,我们可以观察到Alice仅出现了一次,但这当然是因为Alice的键被覆盖了。
请注意,在将结果返回给driver之前,要在pypspark内完成所有处理和过滤。
希望这能有所帮助,谢谢。