使用pyspark从元组列表创建DataFrame

13

我正在使用 simple-salesforce 包从 SFDC 提取数据。我使用 Python3 进行脚本编写以及 Spark 1.5.2 进行处理。

我创建了一个包含以下数据的 RDD:

[('Id', 'a0w1a0000003xB1A'), ('PackSize', 1.0), ('Name', 'A')]
[('Id', 'a0w1a0000003xAAI'), ('PackSize', 1.0), ('Name', 'B')]
[('Id', 'a0w1a00000xB3AAI'), ('PackSize', 30.0), ('Name', 'C')]
...

这些数据存储在名为v_rdd的RDD中。

我的模式看起来像这样:

StructType(List(StructField(Id,StringType,true),StructField(PackSize,StringType,true),StructField(Name,StringType,true)))

我正在尝试从这个RDD创建DataFrame:

sqlDataFrame = sqlContext.createDataFrame(v_rdd, schema)
我打印出了我的数据框:
sqlDataFrame.printSchema()

获得以下内容:

+--------------------+--------------------+--------------------+
|                  Id|  PackSize|                          Name|
+--------------------+--------------------+--------------------+
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...|
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...|
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...|

我希望看到实际的数据,就像这样:

+------------------+------------------+--------------------+
|                Id|PackSize|                          Name|
+------------------+------------------+--------------------+
|a0w1a0000003xB1A  |               1.0|       A            |
|a0w1a0000003xAAI  |               1.0|       B            |
|a0w1a00000xB3AAI  |              30.0|       C            |

你能帮我识别出我在这里做错了什么吗。

我的Python脚本很长,我不确定人们是否方便查看它,因此我只发布了我遇到问题的部分。

非常感谢您提前的帮助!

1个回答

25

嘿,下次能否提供一个可运行的示例呢?那会更容易理解。

按照Spark文档的方式创建DataFrame,你介绍RDD的方法基本上是奇怪的。

>>> l = [('Alice', 1)]
>>> sqlContext.createDataFrame(l).collect()
[Row(_1=u'Alice', _2=1)]
>>> sqlContext.createDataFrame(l, ['name', 'age']).collect()
[Row(name=u'Alice', age=1)]

对于你的例子,你可以按照以下方式创建所需的输出:

# Your data at the moment
data = sc.parallelize([ 
[('Id', 'a0w1a0000003xB1A'), ('PackSize', 1.0), ('Name', 'A')],
[('Id', 'a0w1a0000003xAAI'), ('PackSize', 1.0), ('Name', 'B')],
[('Id', 'a0w1a00000xB3AAI'), ('PackSize', 30.0), ('Name', 'C')]
    ])
# Convert to tuple
data_converted = data.map(lambda x: (x[0][1], x[1][1], x[2][1]))

# Define schema
schema = StructType([
    StructField("Id", StringType(), True),
    StructField("Packsize", StringType(), True),
    StructField("Name", StringType(), True)
])

# Create dataframe
DF = sqlContext.createDataFrame(data_converted, schema)

# Output
DF.show()
+----------------+--------+----+
|              Id|Packsize|Name|
+----------------+--------+----+
|a0w1a0000003xB1A|     1.0|   A|
|a0w1a0000003xAAI|     1.0|   B|
|a0w1a00000xB3AAI|    30.0|   C|
+----------------+--------+----+

希望这可以帮到你。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接