在pyspark中从数据框构建StructType

19

我是一个新的Spark和Python用户,遇到了以下问题:如何从元数据文件构建模式,以便可以应用于我的数据文件。 场景:数据文件(csv格式)的元数据文件包含列及其类型,例如:

id,int,10,"","",id,"","",TRUE,"",0
created_at,timestamp,"","","",created_at,"","",FALSE,"",0

我已成功将其转换为一个类似于以下数据框的数据:

+--------------------+---------------+
|                name|           type|
+--------------------+---------------+
|                  id|  IntegerType()|
|          created_at|TimestampType()|
|          updated_at|   StringType()|

但是当我尝试使用以下方法将其转换为StructField格式时

fields = schemaLoansNew.map(lambda l:([StructField(l.name, l.type, 'true')]))

或者

schemaList = schemaLoansNew.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).collect()

然后稍后将其转换为StructType,使用

schemaFinal = StructType(schemaList)
我遇到了以下错误:
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/mapr/spark/spark-1.4.1/python/pyspark/sql/types.py", line 372, in __init__
assert all(isinstance(f, DataType) for f in fields), "fields should be a list of DataType"
AssertionError: fields should be a list of DataType

由于我对数据框架的知识不足,我卡住了,您能否请指教以下如何继续。一旦我的模式准备好,我想使用createDataFrame将其应用于我的数据文件。这个过程必须针对许多表进行,因此我不想硬编码类型,而是使用元数据文件来构建模式然后应用于RDD。

提前感谢。

3个回答

27

字段的参数必须是DataType对象的列表。这样做:

Fields have argument have to be a list of DataType objects. This:


.map(lambda l:([StructField(l.name, l.type, 'true')]))

collect 后生成一个由 tuples (Rows) 的 DataType 组成的 list[list[tuple[DataType]]] 类型的列表,同时需要注意 nullable 参数应为布尔值而非字符串。

.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).
< p >在collect之后生成一个由str对象组成的list

你展示的记录的正确格式应该大致如下:

from pyspark.sql.types import *

StructType([
    StructField("id", IntegerType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("updated_at", StringType(), True)
])

虽然使用分布式数据结构执行这样的任务是严重的过度设计,更不用说效率低下,但你可以尝试按照以下方式调整你的第一个解决方案:

StructType([
    StructField(name, eval(type), True) for (name, type) in  df.rdd.collect()
])

但是它不是特别安全(eval)。从 JSON / 字典构建模式可能更容易。假设您有一个函数,该函数将类型描述映射到规范类型名称:

def get_type_name(s: str) -> str:
    """
    >>> get_type_name("int")
    'integer'
    """
    _map = {
        'int': IntegerType().typeName(),
        'timestamp': TimestampType().typeName(),
        # ...
    } 
    return _map.get(s, StringType().typeName())

你可以构建以下形式的字典:

schema_dict = {'fields': [
    {'metadata': {}, 'name': 'id', 'nullable': True, 'type': 'integer'},
    {'metadata': {}, 'name': 'created_at', 'nullable': True, 'type': 'timestamp'}
], 'type': 'struct'}

将其提供给StructType.fromJson进行解析:

StructType.fromJson(schema_dict)

我没有创建一个函数,而是使用了df.replace来替换整数、字符串等的出现。我尝试使用第一个选项eval - 但它仍然给我相同的错误。有没有一种方法可以将我的csv数据转换为pyspark中的字典格式? - manmeet
你可以尝试使用 Row.asDict,但仍需要一个映射。 - zero323
我已经定义了一个映射,如下:testDF = schemaLoans.replace(['int','string','timestamp'],['IntegerType()','StringType()','TimestampType()'],'type')。这样可以得到最终的DF,其中包含正确的数据类型。请问在这种情况下我该如何使用Row.asDict?我想要自动化创建每个元数据文件的模式字典的过程,因为我有超过100个文件。谢谢。 - manmeet
好的,目前我只是尝试使用createDataFrame来应用模式,并且我已经更改了数据类型以匹配数据,但现在我遇到了这个错误:TypeError: ...无法接受<type 'unicode'>类型的对象。对此有什么帮助吗? - manmeet
你可以单独提一个问题,在问题中包含 [mcve],然后 @我一个链接吗? 我猜你传递的是字符串而不是行/元组/...。 - zero323
显示剩余3条评论

1
以下步骤可用于更改数据类型对象。
data_schema=[
    StructField("age", IntegerType(), True),
    StructField("name", StringType(), True)
]



final_struct=StructType(fields=data_schema)

df=spark.read.json('/home/abcde/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/people.json', schema=final_struct)



df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)

-2
val columns: Array[String] = df1.columns
val reorderedColumnNames: Array[String] = df2.columns //or do the reordering you want
val result: DataFrame = dataFrame.select(reorderedColumnNames.head, reorderedColumnNames.tail: _*)

你能解释一下你的答案吗? - rassar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接