Spark - 创建嵌套的DataFrame

16

我刚开始使用PySpark并在创建具有嵌套对象的数据框方面遇到了问题。

这是我的例子。

我有用户。

$ cat user.json
{"id":1,"name":"UserA"}
{"id":2,"name":"UserB"}

用户有订单。

$ cat order.json
{"id":1,"price":202.30,"userid":1}
{"id":2,"price":343.99,"userid":1}
{"id":3,"price":399.99,"userid":2}

我喜欢加入它,以获得这样一个结构,其中订单是嵌套在用户中的数组。

$ cat join.json
{"id":1, "name":"UserA", "orders":[{"id":1,"price":202.30,"userid":1},{"id":2,"price":343.99,"userid":1}]}
{"id":2,"name":"UserB","orders":[{"id":3,"price":399.99,"userid":2}]}

我该怎么做呢?有没有类似嵌套连接之类的东西?

>>> user = sqlContext.read.json("user.json")
>>> user.printSchema();
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

>>> order =  sqlContext.read.json("order.json")
>>> order.printSchema();
root
 |-- id: long (nullable = true)
 |-- price: double (nullable = true)
 |-- userid: long (nullable = true)

>>> joined = sqlContext.read.json("join.json")
>>> joined.printSchema();
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- userid: long (nullable = true)

编辑: 我知道可以使用join和foldByKey来实现这个功能,但是否有更简单的方法?

编辑2: 我正在使用@zero323的解决方案。

def joinTable(tableLeft, tableRight, columnLeft, columnRight, columnNested, joinType = "left_outer"):
    tmpTable = sqlCtx.createDataFrame(tableRight.rdd.groupBy(lambda r: r.asDict()[columnRight]))
    tmpTable = tmpTable.select(tmpTable._1.alias("joinColumn"), tmpTable._2.data.alias(columnNested))
    return tableLeft.join(tmpTable, tableLeft[columnLeft] == tmpTable["joinColumn"], joinType).drop("joinColumn")

我添加了第二个嵌套结构“lines”

>>> lines =  sqlContext.read.json(path + "lines.json")
>>> lines.printSchema();
root
 |-- id: long (nullable = true)
 |-- orderid: long (nullable = true)
 |-- product: string (nullable = true)

orders = joinTable(order, lines, "id", "orderid", "lines")
joined = joinTable(user, orders, "id", "userid", "orders")
joined.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- userid: long (nullable = true)
 |    |    |-- lines: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- _1: long (nullable = true)
 |    |    |    |    |-- _2: long (nullable = true)
 |    |    |    |    |-- _3: string (nullable = true)

这个问题是关于行中的列名丢失了,有什么想法吗?

编辑3: 我尝试手动指定模式。

from pyspark.sql.types import *
fields = []
fields.append(StructField("_1", LongType(), True))
inner = ArrayType(lines.schema)
fields.append(StructField("_2", inner))
new_schema = StructType(fields)
print new_schema

grouped =  lines.rdd.groupBy(lambda r: r.orderid)
grouped =  grouped.map(lambda x: (x[0], list(x[1])))
g = sqlCtx.createDataFrame(grouped, new_schema)

错误:

TypeError: StructType(List(StructField(id,LongType,true),StructField(orderid,LongType,true),StructField(product,StringType,true))) can not accept object in type <class 'pyspark.sql.types.Row'>
3个回答

32

这将仅在Spark 2.0或更高版本中起作用

首先我们需要导入一些内容:

from pyspark.sql.functions import struct, collect_list

剩下的就是简单的聚合和连接:

orders = spark.read.json("/path/to/order.json")
users = spark.read.json("/path/to/user.json")

combined = users.join(
    orders
        .groupBy("userId")
        .agg(collect_list(struct(*orders.columns)).alias("orders"))
        .withColumnRenamed("userId", "id"), ["id"])

对于示例数据,结果为:

combined.show(2, False)
+---+-----+---------------------------+
|id |name |orders                     |
+---+-----+---------------------------+
|1  |UserA|[[1,202.3,1], [2,343.99,1]]|
|2  |UserB|[[3,399.99,2]]             |
+---+-----+---------------------------+

使用模式:

combined.printSchema()
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- userid: long (nullable = true)

以及JSON表示:

for x in combined.toJSON().collect():
    print(x)     
{"id":1,"name":"UserA","orders":[{"id":1,"price":202.3,"userid":1},{"id":2,"price":343.99,"userid":1}]}
{"id":2,"name":"UserB","orders":[{"id":3,"price":399.99,"userid":2}]}

请注意,这个方法之所以可行是因为尽管“users”很大(因为它在数据框中),但特定用户的订单数量足够小,可以保存在一个集合中。如果不是这种情况呢?如果订单是其他东西,比如出于论证目的,具有相同发色的用户,无法保存在一个集合中怎么办?你是否被迫收集发色并逐个进行处理,或者可能使用笛卡尔积? - oneirois
@oneirois 简而言之 - 整个想法不可行。行是并行性的最小单位,不能被分割或部分溢出。您可以使用 DataFrameWriter 为每个分组因子创建单独的文件,而无需创建嵌套结构 - 这将更好地扩展。 - zero323
这正是我最终所做的 :) - oneirois

-1

将嵌套的数据框转换为普通的数据框,请使用以下代码:

dff = df.select("包含多个列的列名.*")


1
请不要使用Pandas!这会调用Spark的collect()方法!它非常缓慢且不是分布式的,因为所有数据都将被带回到一个单点——即Spark Driver。 - prossblad

-1

首先,您需要使用userid作为第二个DataFrame的连接键:

user.join(order, user.id == order.userid)

然后,您可以使用map步骤将结果记录转换为所需的格式。


不完全是。Map 是不够的。 如果我连接用户和订单,我将有3条记录。(而我只想要2条)。所以我还需要某种聚合(例如 foldByKey)。 - Maciek Bryński

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接