如何在Pyspark数据框的列中创建一个列表。

3

我可以帮助您进行翻译。以下是需要翻译的内容:

我有一个数据框,其中包含以下数据:

df.show()

    +-----+------+--------+
    | id_A| idx_B| B_value|
    +-----+------+--------+
    |    a|     0|       7|
    |    b|     0|       5|
    |    b|     2|       2|
    +-----+------+--------+

假设B有3个可能的索引,我想创建一个表格,将所有的索引和值合并成一个列表(或numpy数组),看起来像这样:
final_df.show()

    +-----+----------+
    | id_A|  B_values|
    +-----+----------+
    |    a| [7, 0, 0]|
    |    b| [5, 0, 2]|
    +-----+----------+

我已经做到了这一点:

from pyspark.sql import functions as f

temp_df = df.withColumn('B_tuple', f.struct(df['idx_B'], df['B_value']))\
            .groupBy('id_A').agg(f.collect_list('B_tuple').alias('B_tuples'))
temp_df.show()

    +-----+-----------------+
    | id_A|         B_tuples|
    +-----+-----------------+
    |    a|         [[0, 7]]|
    |    b| [[0, 5], [2, 2]]|
    +-----+-----------------+

但是现在我无法运行一个合适的udf函数来将temp_df转换为final_df

是否有更简单的方法来完成这个任务?

如果没有,那么我应该使用什么样的正确函数来完成转换?

2个回答

3

我已经找到了一个解决方案,

def create_vector(tuples_list, size):
    my_list = [0] * size
    for x in tuples_list:
        my_list[x["idx_B"]] = x["B_value"]
    return my_list

create_vector_udf = f.udf(create_vector, ArrayType(IntegerType()))

final_df = temp_df.with_column('B_values', create_vector_udf(temp_df['B_tuples'])).select(['id_A', 'B_values'])

final_df.show()

    +-----+----------+
    | id_A|  B_values|
    +-----+----------+
    |    a| [7, 0, 0]|
    |    b| [5, 0, 2]|
    +-----+----------+

不错的解决方案。我对UDF还没有太多经验,所以这是一个很好的例子,可以让我了解它们何时有用 :) - Florian

1

如果您已经知道数组的size,则可以在不使用udf的情况下完成此操作。

利用pivot()的可选第二个参数:values。这将接受一个

值列表,这些值将被转换为输出DataFrame中的列

因此,对id_A列进行groupBy,并在idx_B列上旋转DataFrame。由于可能不是所有索引都存在,因此可以将range(size)作为values参数传递。

import pyspark.sql.functions as f
size = 3
df = df.groupBy("id_A").pivot("idx_B", values=range(size)).agg(f.first("B_value"))
df = df.na.fill(0)
df.show()
#+----+---+---+---+
#|id_A|  0|  1|  2|
#+----+---+---+---+
#|   b|  5|  0|  2|
#|   a|  7|  0|  0|
#+----+---+---+---+

数据中没有的索引将默认为空值null,因此我们调用na.fill(0)来设置默认值。

一旦您的数据格式为此,您只需从列中创建数组即可:

df.select("id_A", f.array([f.col(str(i)) for i in range(size)]).alias("B_values")).show()
#+----+---------+
#|id_A| B_values|
#+----+---------+
#|   b|[5, 0, 2]|
#|   a|[7, 0, 0]|
#+----+---------+

也许,但是通过指定枢轴的值,您可以获得很大的性能提升。我不确定这与将udf序列化到Python相比如何。这仍然可能是更好的选择。 - pault

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接