创建Spark DataFrame。无法推断类型的模式。

93

有人可以帮我解决我在Spark DataFrame中遇到的问题吗?

当我执行myFloatRDD.toDF()时,出现错误:

TypeError:无法推断出类型为“float”的模式

我不明白为什么会这样...

示例:

myFloatRdd = sc.parallelize([1.0,2.0,3.0])
df = myFloatRdd.toDF()

谢谢

4个回答

147

SparkSession.createDataFrame用于创建DataFrame,需要传入一个RDD或者list类型的Rowtuplelistdict*对象,或者提供带有DataType的模式。尝试将浮点数转换为元组,例如:

myFloatRdd.map(lambda x: (x, )).toDF()

或者更好的是:

from pyspark.sql import Row

row = Row("val") # Or some other column name
myFloatRdd.map(row).toDF()

要从标量列表创建一个DataFrame对象,您需要直接使用SparkSession.createDataFrame方法,并提供一个模式***:

from pyspark.sql.types import FloatType

df = spark.createDataFrame([1.0, 2.0, 3.0], FloatType())

df.show()

## +-----+
## |value|
## +-----+
## |  1.0|
## |  2.0|
## |  3.0|
## +-----+

但对于简单的范围,最好使用SparkSession.range

from pyspark.sql.functions import col

spark.range(1, 4).select(col("id").cast("double"))

* 不再受支持。

** Spark SQL 还为暴露 __dict__ 的 Python 对象提供了有限的模式推断支持。

*** 仅在 Spark 2.0 或更高版本中受支持。


1
我对Spark是个新手。你能解释一下 myFloatRdd.map(lambda x: (x, )).toDF() 这个方法是如何解决问题的吗? map(lambda x: (x,)) 只是把RDD对象转换成了一列行数据吗? - honeybadger
@kasa 对于元组(-> struct)有推理映射,但对于标量没有。 - zero323
1
通过使用第一个选项,我们可以在同一行中提供列名:rdd.map(lambda x: (x, )).toDF(['colName']) - ZygD

0
from pyspark.sql.types import IntegerType, Row

mylist = [1, 2, 3, 4, None ]
l = map(lambda x : Row(x), mylist)
# notice the parens after the type name
df=spark.createDataFrame(l,["id"])
df.where(df.id.isNull() == False).show()

基本上,您需要将int初始化为Row(),然后我们可以使用模式。

0
使用反射推断模式
from pyspark.sql import Row
# spark - sparkSession
sc = spark.sparkContext

# Load a text file and convert each line to a Row.
orders = sc.textFile("/practicedata/orders")
#Split on delimiters
parts = orders.map(lambda l: l.split(","))
#Convert to Row
orders_struct = parts.map(lambda p: Row(order_id=int(p[0]), order_date=p[1], customer_id=p[2], order_status=p[3]))
for i in orders_struct.take(5): print(i)
#convert the RDD to DataFrame

orders_df = spark.createDataFrame(orders_struct)
以编程方式指定模式
from pyspark.sql import Row
# spark - sparkSession
sc = spark.sparkContext

# Load a text file and convert each line to a Row.
orders = sc.textFile("/practicedata/orders")
#Split on delimiters
parts = orders.map(lambda l: l.split(","))
#Convert to tuple
orders_struct = parts.map(lambda p: (p[0], p[1], p[2], p[3].strip()))

#convert the RDD to DataFrame

orders_df = spark.createDataFrame(orders_struct)

# The schema is encoded in a string.
schemaString = "order_id order_date customer_id status"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = Struct

ordersDf = spark.createDataFrame(orders_struct, schema)

类型(字段)


你好!欢迎来到StackOverflow。如果你认为对已接受的“答案”有所补充,请清晰地表达,并避免添加未经解释的代码片段。 - guzmonne

-1
from pyspark.sql import Row
myFloatRdd.map(lambda x: Row(x)).toDF()

4
这与已接受的答案有何不同?另外,您应该添加一些解释。 - Banana

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接