我有一个RDD(我们可以称之为myrdd),其中RDD中的每个记录的格式如下:
[('column 1',value), ('column 2',value), ('column 3',value), ... , ('column 100',value)]
我想在pyspark中将这个内容转换为DataFrame - 最简单的方法是什么?我有一个RDD(我们可以称之为myrdd),其中RDD中的每个记录的格式如下:
[('column 1',value), ('column 2',value), ('column 3',value), ... , ('column 100',value)]
我想在pyspark中将这个内容转换为DataFrame - 最简单的方法是什么?你可以尝试使用toDF
方法,只需要添加字段名称即可。
df = rdd.toDF(['column', 'value'])
@dapangmao 的回答启发了我,让我得到了这个解决方案:
my_df = my_rdd.map(lambda l: Row(**dict(l))).toDF()
my_rdd
,则以下内容应该可行。from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
# You have a ton of columns and each one should be an argument to Row
# Use a dictionary comprehension to make this easier
def record_to_row(record):
schema = {'column{i:d}'.format(i = col_idx):record[col_idx] for col_idx in range(1,100+1)}
return Row(**schema)
row_rdd = my_rdd.map(lambda x: record_to_row(x))
# Now infer the schema and you have a DataFrame
schema_my_rdd = sqlContext.inferSchema(row_rdd)
# Now you have a DataFrame you can register as a table
schema_my_rdd.registerTempTable("my_table")
我在Spark中没有太多与DataFrames一起工作的经验,但这应该可以解决问题。
AttributeError: 'SQLContext'对象没有'inferSchema'属性
。 - Jérémy在Pyspark中,假设你有一个名为userDF 的数据框。
>>> type(userDF)
<class 'pyspark.sql.dataframe.DataFrame'>
让我们将其转换为RDD(
userRDD = userDF.rdd
>>> type(userRDD)
<class 'pyspark.rdd.RDD'>
现在你可以进行一些操作,例如调用 map 函数:
newRDD = userRDD.map(lambda x:{"food":x['favorite_food'], "name":x['name']})
最后,让我们从弹性分布式数据集(RDD)创建一个数据框架(DataFrame)。
newDF = sqlContext.createDataFrame(newRDD, ["food", "name"])
>>> type(ffDF)
<class 'pyspark.sql.dataframe.DataFrame'>
就这些。
之前我在尝试调用时遇到了这个警告信息:
newDF = sc.parallelize(newRDD, ["food","name"] :
.../spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py:336: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row inst warnings.warn("Using RDD of dict to inferSchema is deprecated. "
所以不再需要这样做了...