从列表RDD创建Spark DataFrame

15

我有一个RDD(我们可以称之为myrdd),其中RDD中的每个记录的格式如下:

[('column 1',value), ('column 2',value), ('column 3',value), ... , ('column 100',value)]
我想在pyspark中将这个内容转换为DataFrame - 最简单的方法是什么?

从你的问题中并不完全清楚你遇到了什么困难。是因为你有太多列吗?还是因为你的RDD记录是元组列表? - Kyle Heuton
4个回答

32

你可以尝试使用toDF方法,只需要添加字段名称即可。

df = rdd.toDF(['column', 'value'])

这个答案可行,我下面发布的解决方案(基于你的答案)可以将上述描述的RDD转换为DataFrame。 - mgoldwasser
如果您不知道列名或想使用其他数据框的列,该怎么办?我有一个相关的问题:https://dev59.com/SLH3oIgBc1ULPQZFNcWx?noredirect=1#comment125309949_70882076 - Sushil Verma

15

@dapangmao 的回答启发了我,让我得到了这个解决方案:


my_df = my_rdd.map(lambda l: Row(**dict(l))).toDF()

4
请参考DataFrame文档,使本示例能够正常运行。假设你的RDD被命名为my_rdd,则以下内容应该可行。
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

# You have a ton of columns and each one should be an argument to Row
# Use a dictionary comprehension to make this easier
def record_to_row(record):
    schema = {'column{i:d}'.format(i = col_idx):record[col_idx] for col_idx in range(1,100+1)}
    return Row(**schema)


row_rdd = my_rdd.map(lambda x: record_to_row(x))

# Now infer the schema and you have a DataFrame
schema_my_rdd = sqlContext.inferSchema(row_rdd)

# Now you have a DataFrame you can register as a table
schema_my_rdd.registerTempTable("my_table")

我在Spark中没有太多与DataFrames一起工作的经验,但这应该可以解决问题。


你可能需要在创建sqlContext之后添加一行代码来加载implicits库:"import sqlContext.implicits._"。请参阅https://spark.apache.org/docs/1.3.0/sql-programming-guide.html。 - Glenn Strycker
这不是只有Scala才能做到的事情吗?我的答案用Python写的。 - Kyle Heuton
我得到了以下错误信息:AttributeError: 'SQLContext'对象没有'inferSchema'属性 - Jérémy

1

在Pyspark中,假设你有一个名为userDF 的数据框。

>>> type(userDF)
<class 'pyspark.sql.dataframe.DataFrame'>

让我们将其转换为RDD(

userRDD = userDF.rdd
>>> type(userRDD)
<class 'pyspark.rdd.RDD'>

现在你可以进行一些操作,例如调用 map 函数:

newRDD = userRDD.map(lambda x:{"food":x['favorite_food'], "name":x['name']})

最后,让我们从弹性分布式数据集(RDD)创建一个数据框架(DataFrame)。

newDF = sqlContext.createDataFrame(newRDD, ["food", "name"])

>>> type(ffDF)
<class 'pyspark.sql.dataframe.DataFrame'>

就这些。

之前我在尝试调用时遇到了这个警告信息:

newDF = sc.parallelize(newRDD, ["food","name"] : 

.../spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py:336: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row inst  warnings.warn("Using RDD of dict to inferSchema is deprecated. "

所以不再需要这样做了...


如果每行有很多列,而且潜在地每行的定义都不同,应该怎么办? - ulkas

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接