Spark RDD 转换为 DataFrame Python

50

我正在尝试将Spark RDD转换为DataFrame。 我已经查看了文档和示例,其中方案被传递给sqlContext.CreateDataFrame(rdd,schema)函数。

但是我有38个列或字段,而且这个数量还会进一步增加。如果我手动给出方案并指定每个字段的信息,那么这将是一项非常繁琐的工作。

是否有其他方法可以在不事先知道列信息的情况下指定模式?


2
如果你有38列,为什么要一开始就使用RDD?为什么不从DataFrame开始呢? - Yaron
1
我正在从Neo4j图数据库中加载数据。这些数据以RDD的形式检索,并且对其有一些依赖关系。 - Jack Daniel
3个回答

77

看,

在Spark中将RDD转换为DataFrame有两种方法。

toDF()createDataFrame(rdd, schema)

我会向您展示如何以动态方式执行此操作。

toDF()

toDF()命令提供了一种将RDD[Row]转换为数据帧的方法。关键是,对象Row()可以接收一个**kwargs参数。因此,有一种简单的方法可以做到这一点。

from pyspark.sql.types import Row

#here you are going to create a function
def f(x):
    d = {}
    for i in range(len(x)):
        d[str(i)] = x[i]
    return d

#Now populate that
df = rdd.map(lambda x: Row(**f(x))).toDF()

通过这种方式,您将能够动态创建数据框。

createDataFrame(rdd,schema)

另一种方法是创建动态模式。如何实现呢?

就像这样:

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType

schema = StructType([StructField(str(i), StringType(), True) for i in range(32)])

df = sqlContext.createDataFrame(rdd, schema)

第二种方法更加清晰易懂...

这就是如何动态创建数据框。


1
请问在执行时间方面,哪种方法是最便宜的? - Ishtiaque Khan
在这种情况下,将使用数据框选项。由于使用PySpark RDD函数,将使用JVM和Python之间的管道来运行从f(x)传递的逻辑,并且在使用DataFrame时,您不需要与Python通信以进行模式定义,因为模式已经在For中构建完成。 - Thiago Baldim
对不起,您的意思是sqlContext.createDataFrame(rdd, schema)选项更好,是吗? - Ishtiaque Khan
是的,对于 PySpark 来说,性能最佳的选项始终是使用 DataFrame。 - Thiago Baldim
9
我会尽力做好翻译。以下是需要翻译的内容:我之所以投了反对票是因为这个代码将所有字段都转换成字符串。原帖的作者表示他事先不知道这些列是什么。他怎么知道它们是字符串类型的? - kingledion

3

我更喜欢Arun的答案,但有一个小问题,我无法评论或编辑该答案。sparkContext没有createDeataFrame函数,而sqlContext有(正如Thiago所提到的)。因此:

from pyspark.sql import SQLContext

# assuming the spark environemnt is set and sc is spark.sparkContext 
sqlContext = SQLContext(sc)
schemaPeople = sqlContext.createDataFrame(RDDName)
schemaPeople.createOrReplaceTempView("RDDName")

另一种方法:from pyspark.sql import SparkSessionspark = SparkSession.builder.getOrCreate()spark.createDataFrame(...) - Tim

1
尝试看看是否有效
sc = spark.sparkContext

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(RddName)
schemaPeople.createOrReplaceTempView("RddName")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接