Pyspark数据框:计算数组或列表中的元素数量

11

假设数据框 df 如下:

df.show()

输出:

+------+----------------+
|letter| list_of_numbers|
+------+----------------+
|     A|    [3, 1, 2, 3]|
|     B|    [1, 2, 1, 1]|
+------+----------------+

我想要做的是在列list_of_numbers中计数特定元素的数量。就像这样:

+------+----------------+----+
|letter| list_of_numbers|ones|
+------+----------------+----+
|     A|    [3, 1, 2, 3]|   1|
|     B|    [1, 2, 1, 1]|   3|
+------+----------------+----+

我迄今为止尝试创建udf并且它完美的工作了,但我想知道是否可以在不定义任何udf的情况下实现。

4个回答

8
你可以将数组拆分并过滤出值为1的。然后使用groupBycount
from pyspark.sql.functions import col, count, explode

df.select("*", explode("list_of_numbers").alias("exploded"))\
    .where(col("exploded") == 1)\
    .groupBy("letter", "list_of_numbers")\
    .agg(count("exploded").alias("ones"))\
    .show()
#+------+---------------+----+
#|letter|list_of_numbers|ones|
#+------+---------------+----+
#|     A|   [3, 1, 2, 3]|   1|
#|     B|   [1, 2, 1, 1]|   3|
#+------+---------------+----+

为了保持所有行的数量,即使为0,您可以将exploded列转换为指示器变量。然后进行groupBysum操作。
from pyspark.sql.functions import col, count, explode, sum as sum_

df.select("*", explode("list_of_numbers").alias("exploded"))\
    .withColumn("exploded", (col("exploded") == 1).cast("int"))\
    .groupBy("letter", "list_of_numbers")\
    .agg(sum_("exploded").alias("ones"))\
    .show()

注意,我已经将 pyspark.sql.functions.sum 导入为 sum_,以免覆盖内置的 sum 函数。

不错!这个方法可行,因为我应该使用join将结果与原始的df合并。否则我会失去那些没有one的行 :) - Ala Tarighati
1
@AllaTarighati,我已经发布了一个更新,这样你就可以避免与原始df进行联接。 - pault

5

从pyspark 3+开始,我们可以使用数组转换。

https://mungingdata.com/spark-3/array-exists-forall-transform-aggregate-zip_with/ https://medium.com/expedia-group-tech/deep-dive-into-apache-spark-array-functions-720b8fbfa729

import pyspark.sql.functions as F

df = spark_session.createDataFrame(
    [
        ['A',[3, 1, 2, 3]],
        ['B',[1, 2, 1, 1]]
    ],      
        ['letter','list_of_numbers'])

df1 = df.selectExpr('*','filter(list_of_numbers, x->x=1) as ones_array')
df2 = df1.selectExpr('*', 'size(ones_array) as ones')
df2.show()

+------+---------------+----------+----+
|letter|list_of_numbers|ones_array|ones|
+------+---------------+----------+----+
|     A|   [3, 1, 2, 3]|       [1]|   1|
|     B|   [1, 2, 1, 1]| [1, 1, 1]|   3|
+------+---------------+----------+----+

4
假设列表长度恒定,我能想到的一种方法是:
from operator import add
from functools import reduce
import pyspark.sql.functions as F

df = sql.createDataFrame(
    [
        ['A',[3, 1, 2, 3]],
        ['B',[1, 2, 1, 1]]
    ],      
        ['letter','list_of_numbers'])

expr = reduce(add,[F.when(F.col('list_of_numbers').getItem(x)==1, 1)\
                    .otherwise(0) for x in range(4)])
df = df.withColumn('ones', expr)
df.show()

+------+---------------+----+
|letter|list_of_numbers|ones|
+------+---------------+----+
|     A|   [3, 1, 2, 3]|   1|
|     B|   [1, 2, 1, 1]|   3|
+------+---------------+----+

2
谢谢,但它们不一定具有相同的大小。 - Ala Tarighati

0

上面有一条评论来自Ala Tarighati,说这个解决方案对于长度不同的数组无效。以下是一个UDF,可以解决这个问题。

from operator import add
from functools import reduce
import pyspark.sql.functions as F

df = sql.createDataFrame(
    [
        ['A',[3, 1, 2, 3]],
        ['B',[1, 2, 1, 1]]
    ],      
        ['letter','list_of_numbers'])

df_ones = (
    df.withColumn(
        'ones', 
        reduce(
            add,
            [
                F.when(
                    F.col("list_of_numbers").getItem(x) == F.lit("1"), 1
                ).otherwise(0)
                for x in range(len("drivers"))
            ],
        ),
    )
)
df_ones.show()
+------+---------------+----+
|letter|list_of_numbers|ones|
+------+---------------+----+
|     A|   [3, 1, 2, 3]|   1|
|     B|   [1, 2, 1, 1]|   3|
+------+---------------+----+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接