使用强制模式将Pyspark RDD转换为DataFrame:值错误

3
我正在使用与本帖末尾所示的模式相对应的 pyspark 进行工作(请注意,有嵌套列表和无序字段),最初从 Parquet 导入为 DataFrame。 根本上,我遇到的问题是无法将此数据处理为 RDD,然后转换回 DataFrame。(我已经查看了几篇相关文章,但仍无法确定我的错误在哪里。)
简单地说,以下代码运行良好(如预期的那样):
schema = deepcopy(tripDF.schema)
tripRDD = tripDF.rdd
tripDFNew = sqlContext.createDataFrame(tripRDD, schema)
tripDFNew.take(1)

当我需要映射RDD时(例如添加字段),事情就无法正常工作。

schema = deepcopy(tripDF.schema)
tripRDD = tripDF.rdd
def trivial_map(row):
    rowDict = row.asDict()
    return pyspark.Row(**rowDict)
tripRDDNew = tripRDD.map(lambda row: trivial_map(row))
tripDFNew = sqlContext.createDataFrame(tripRDDNew, schema)
tripDFNew.take(1)

上面的代码会产生以下异常,其中XXX是一个整数的替代符号,该整数在每次运行时都会更改(例如,我已经看到了1、16、23等)。
File "/opt/cloudera/parcels/CDH-5.8.3-
1.cdh5.8.3.p1967.2057/lib/spark/python/pyspark/sql/types.py", line 546, in 
toInternal
raise ValueError("Unexpected tuple %r with StructType" % obj)
ValueError: Unexpected tuple XXX with StructType`

鉴于这些信息,“第二个代码块”中是否存在明显的错误?(我注意到tripRDD是rdd.RDD类,而tripRDDNew是rdd.PipelinedRDD类,但我认为这不应该是问题。)(我还注意到tripRDD的模式未按字段名称排序,而tripRDDNew的模式按字段名称排序。同样,我不认为这会有问题。)
架构:
root
 |-- foo: struct (nullable = true)
 |    |-- bar_1: integer (nullable = true)
 |    |-- bar_2: integer (nullable = true)
 |    |-- bar_3: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- baz_1: integer (nullable = true)
 |    |    |    |-- baz_2: string (nullable = true)
 |    |    |    |-- baz_3: double (nullable = true)
 |    |-- bar_4: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- baz_1: integer (nullable = true)
 |    |    |    |-- baz_2: string (nullable = true)
 |    |    |    |-- baz_3: double (nullable = true)
 |-- qux: integer (nullable = true)
 |-- corge: integer (nullable = true)
 |-- uier: integer (nullable = true)`
1个回答

2

正如帖子中所提到的,原始模式具有未按字母顺序排序的字段。这就是问题所在。映射函数中使用.asDict()将导致生成的RDD的字段被排序。tripRDDNew的字段顺序与调用createDataFrame时的模式冲突。ValueError的结果是尝试将整数字段(即示例中的qux、corge或uier)解析为StructType。

(另外:令人惊讶的是,createDataFrame要求模式字段与RDD字段具有相同的顺序。您应该需要一致的字段名称或一致的字段顺序,但要求两者都具备似乎有些过分。)

(第二个问题:DataFrame中存在非字母顺序的字段有些不正常。例如,sc.parallelize()将在分发数据结构时自动按字母顺序排序字段。似乎应该在从parquet文件导入DataFrame时对字段进行排序。研究为什么不是这种情况可能很有趣。)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接