PySpark:当列是列表时向DataFrame添加列

11

我读过类似的问题,但没有找到解决我的具体问题的方法。

我有一个列表

l = [1, 2, 3]

以及一个数据框

df = sc.parallelize([
    ['p1', 'a'],
    ['p2', 'b'],
    ['p3', 'c'],
]).toDF(('product', 'name'))

我想获取一个新的DataFrame,其中列表l被添加为另一列,即

+-------+----+---------+
|product|name| new_col |
+-------+----+---------+
|     p1|   a|     1   |
|     p2|   b|     2   |
|     p3|   c|     3   |
+-------+----+---------+

使用JOIN的方法,我正在将df连接到一个

 sc.parallelize([[1], [2], [3]])

执行失败了。使用withColumn方法的尝试,例如:

new_df = df.withColumn('new_col', l)

由于列表不是Column对象,因此失败。


3
我认为这是一个很棒的问题,因为它展示了Spark DataFrames API中严重缺失的功能。 - Katya Willard
3个回答

3

因此,从阅读一些有趣的内容这里,我得出结论,你不能真正地将一个随机/任意列附加到给定的DataFrame对象中。似乎你想要的更像是一个zip而不是一个join。我查了一下,发现这个票, 这让我认为你无法使用zip,因为你有DataFrame而不是RDD对象。

我唯一能解决你的问题的方法涉及离开DataFrame对象的世界并返回RDD对象。我还需要为连接创建一个索引,这可能适用于您的用例,也可能不适用。

l = sc.parallelize([1, 2, 3])
index = sc.parallelize(range(0, l.count()))
z = index.zip(l)

rdd = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c']])
rdd_index = index.zip(rdd)

# just in case!
assert(rdd.count() == l.count())
# perform an inner join on the index we generated above, then map it to look pretty.
new_rdd = rdd_index.join(z).map(lambda (x, y): [y[0][0], y[0][1], y[1]])
new_df = new_rdd.toDF(["product", 'name', 'new_col'])

当我运行new_df.show()时,我得到:
+-------+----+-------+
|product|name|new_col|
+-------+----+-------+
|     p1|   a|      1|
|     p2|   b|      2|
|     p3|   c|      3|
+-------+----+-------+

顺便提一下:我真的很惊讶这没起作用。看起来像是一个外连接?

from pyspark.sql import Row
l = sc.parallelize([1, 2, 3])
new_row = Row("new_col_name")
l_as_df = l.map(new_row).toDF()
new_df = df.join(l_as_df)

当我运行new_df.show()时,得到如下结果:
+-------+----+------------+
|product|name|new_col_name|
+-------+----+------------+
|     p1|   a|           1|
|     p1|   a|           2|
|     p1|   a|           3|
|     p2|   b|           1|
|     p3|   c|           1|
|     p2|   b|           2|
|     p2|   b|           3|
|     p3|   c|           2|
|     p3|   c|           3|
+-------+----+------------+

请注意,您可以直接使用df.rdd将DataFrame转换为RDD,例如在我的问题中使用。 - mar tin

1
如果product列是唯一的,则考虑以下方法:
原始数据框:
df = spark.sparkContext.parallelize([
    ['p1', 'a'],
    ['p2', 'b'],
    ['p3', 'c'],
]).toDF(('product', 'name'))

df.show()

+-------+----+
|product|name|
+-------+----+
|     p1|   a|
|     p2|   b|
|     p3|   c|
+-------+----+

新列(和新索引列):
lst = [1, 2, 3]
indx = ['p1','p2','p3']

从上面的列表中创建一个带有索引的新数据框:
from pyspark.sql.types import *
myschema= StructType([ StructField("indx", StringType(), True),
                       StructField("newCol", IntegerType(), True)                       
                     ])
df1=spark.createDataFrame(zip(indx,lst),schema = myschema)
df1.show()
+----+------+
|indx|newCol|
+----+------+
|  p1|     1|
|  p2|     2|
|  p3|     3|
+----+------+

使用创建的索引将此内容与原始数据框合并:

dfnew = df.join(df1, df.product == df1.indx,how='left')\
          .drop(df1.indx)\
          .sort("product")

获取:

dfnew.show()

+-------+----+------+
|product|name|newCol|
+-------+----+------+
|     p1|   a|     1|
|     p2|   b|     2|
|     p3|   c|     3|
+-------+----+------+

0

这可以通过RDDs实现。

1 将数据框转换为索引RDD:

df_rdd = df.rdd.zipWithIndex().map(lambda row: (row[1], (row[0][0], row[0][1])))
l_rdd = sc.parallelize(l).zipWithIndex().map(lambda row: (row[1], row[0]))

通过索引连接两个RDD,删除索引并重新排列元素:

res_rdd = df_rdd.join(l_rdd).map(lambda row: [row[1][0][0], row[1][0][1], row[1][1]])

3 将结果转换为数据框:

res_df = res_rdd.toDF(['product', 'name', 'new_col'])
res_df.show()

+-------+----+-------+
|product|name|new_col|
+-------+----+-------+
|     p1|   a|      1|
|     p2|   b|      2|
|     p3|   c|      3|
+-------+----+-------+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接