如何将包含字典的列表转换为Pyspark数据框

31
我想将我的字典列表转换为DataFrame。这是列表:
mylist = 
[
  {"type_activity_id":1,"type_activity_name":"xxx"},
  {"type_activity_id":2,"type_activity_name":"yyy"},
  {"type_activity_id":3,"type_activity_name":"zzz"}
]

这是我的代码:

from pyspark.sql.types import StringType

df = spark.createDataFrame(mylist, StringType())

df.show(2,False)

+-----------------------------------------+
|                                    value|
+-----------------------------------------+
|{type_activity_id=1,type_activity_id=xxx}|
|{type_activity_id=2,type_activity_id=yyy}|
|{type_activity_id=3,type_activity_id=zzz}|
+-----------------------------------------+

我认为我应该为每个列提供一些映射和类型,但我不知道该怎么做。

更新:

我也尝试了这个:

schema = ArrayType(
    StructType([StructField("type_activity_id", IntegerType()),
                StructField("type_activity_name", StringType())
                ]))
df = spark.createDataFrame(mylist, StringType())
df = df.withColumn("value", from_json(df.value, schema))

但是我得到了null值:

+-----+
|value|
+-----+
| null|
| null|
+-----+
4个回答

39

过去,您只需将字典传递给spark.createDataFrame()即可,但现在这已经被弃用:

mylist = [
  {"type_activity_id":1,"type_activity_name":"xxx"},
  {"type_activity_id":2,"type_activity_name":"yyy"},
  {"type_activity_id":3,"type_activity_name":"zzz"}
]
df = spark.createDataFrame(mylist)
#UserWarning: inferring schema from dict is deprecated,please use pyspark.sql.Row instead
#  warnings.warn("inferring schema from dict is deprecated,"

正如这个警告信息所说,你应该使用pyspark.sql.Row

from pyspark.sql import Row
spark.createDataFrame(Row(**x) for x in mylist).show(truncate=False)
#+----------------+------------------+
#|type_activity_id|type_activity_name|
#+----------------+------------------+
#|1               |xxx               |
#|2               |yyy               |
#|3               |zzz               |
#+----------------+------------------+

在这里我使用了**关键字参数解包)将字典传递给Row构造函数。


谢谢。你知道它为什么被弃用了吗? - Markus
1
我不确定为什么。顺便说一下,这可能比转换成/从json更快。 - pault
1
但是当每个字典(数组元素)的结构不同时,这可能无法正常工作。 - Adiga
1
使用 PySpark 3.0.0 中的 spark.createDataFrame(Row(**x) for x in mylist) 方法时,我遇到了下游问题,其中值被放置在错误的列中。可能与 https://issues.apache.org/jira/browse/SPARK-26200 相关。 - Daniel Himmelstein
如何确保字典中的值是正确的类型,或者在必要时进行类型转换? - Gadam

16

你可以像这样做。你会得到一个有两列的数据框。

mylist = [
  {"type_activity_id":1,"type_activity_name":"xxx"},
  {"type_activity_id":2,"type_activity_name":"yyy"},
  {"type_activity_id":3,"type_activity_name":"zzz"}
]

myJson = sc.parallelize(mylist)
myDf = sqlContext.read.json(myJson)

输出:

+----------------+------------------+
|type_activity_id|type_activity_name|
+----------------+------------------+
|               1|               xxx|
|               2|               yyy|
|               3|               zzz|
+----------------+------------------+

3
如果 mylist 是一个 RDD,你可以使用 spark.read.json(sc.parallelize(mylist)) 进行操作。 - pissall
不幸的是,对于某些记录,它会出现“_corrupt_record”错误。 - Andrew Matiuk
{'fail': None}:记录为空。 - Andrew Matiuk

5

在Spark 2.4版本中,可以通过以下方式直接完成:

df = spark.createDataFrame(mylist)

>>> mylist = [
...   {"type_activity_id":1,"type_activity_name":"xxx"},
...   {"type_activity_id":2,"type_activity_name":"yyy"},
...   {"type_activity_id":3,"type_activity_name":"zzz"}
... ]
>>> df1=spark.createDataFrame(mylist)
>>> df1.show()
+----------------+------------------+
|type_activity_id|type_activity_name|
+----------------+------------------+
|               1|               xxx|
|               2|               yyy|
|               3|               zzz|
+----------------+------------------+

1
它仍然给我这个警告,即 UserWarning: inferring schema from dict is deprecated, please use pyspark.sql.Row instead - Adiga

0

当我从字典列表创建dataframe时,我也遇到了同样的问题。我使用namedtuple解决了这个问题。

以下是我使用提供的数据的代码。

from collections import namedtuple
final_list = []
mylist = [{"type_activity_id":1,"type_activity_name":"xxx"},
          {"type_activity_id":2,"type_activity_name":"yyy"}, 
          {"type_activity_id":3,"type_activity_name":"zzz"}
         ]
ExampleTuple = namedtuple('ExampleTuple', ['type_activity_id', 'type_activity_name'])

for my_dict in mylist:
    namedtupleobj = ExampleTuple(**my_dict)
    final_list.append(namedtupleobj)

sqlContext.createDataFrame(final_list).show(truncate=False)

输出

+----------------+------------------+
|type_activity_id|type_activity_name|
+----------------+------------------+
|1               |xxx               |
|2               |yyy               |
|3               |zzz               |
+----------------+------------------+

我的版本信息如下

spark: 2.4.0
python: 3.6

不必要有my_list变量。因为它是可用的,我已经使用它来创建namedtuple对象,否则可以直接创建namedtuple对象。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接