追加到 PySpark 数组列

7
我想检查列的值是否在某些边界内。如果它们不在,我将向数组列"F"附加一些值。这是我目前的代码:
df = spark.createDataFrame(
    [
        (1, 56), 
        (2, 32),
        (3, 99)
    ],
    ['id', 'some_nr'] 
)

df = df.withColumn( "F", F.lit( None ).cast( types.ArrayType( types.ShortType( ) ) ) )

def boundary_check( val ):
  if (val > 60) | (val < 50):
    return 1

udf  = F.udf( lambda x: boundary_check( x ) ) 

df =  df.withColumn("F", udf(F.col("some_nr")))
display(df)

然而,我不知道如何将内容添加到数组中。目前,如果我对“df”进行另一个边界检查,它将简单地覆盖“F”中之前的值...

2个回答

7
请参考这里的 pyspark.sql.functions 下的 array_union 函数:https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=join#pyspark.sql.functions.array_union。这样可以避免使用 udf,从而保留 Spark 并行化的优势。代码看起来像这样:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import Row
import pyspark.sql.functions as f


conf = SparkConf()
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

df = spark.createDataFrame([Row(c1=["b", "a", "c"], c2="a", c3=10),
                            Row(c1=["b", "a", "c"], c2="d", c3=20)])
df.show()
+---------+---+---+
|       c1| c2| c3|
+---------+---+---+
|[b, a, c]|  a| 10|
|[b, a, c]|  d| 20|
+---------+---+---+

df.withColumn(
    "output_column", 
    f.when(f.col("c3") > 10, 
           f.array_union(df.c1, f.array(f.lit("1"))))
     .otherwise(f.col("c1"))
).show()
+---------+---+---+-------------+
|       c1| c2| c3|output_column|
+---------+---+---+-------------+
|[b, a, c]|  a| 10|    [b, a, c]|
|[b, a, c]|  d| 20| [b, a, c, 1]|
+---------+---+---+-------------+

作为一个旁注,这个函数可以作为一个逻辑联合来使用,因此如果您想附加一个值,您需要确保这个值是唯一的,以便它总是被添加。否则,请查看其他数组函数在这里:https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=join#pyspark.sql.functions.array 注意:大多数数组函数需要您的Spark版本>2.4
编辑(按照评论请求): withColumn方法只允许您一次处理一列,因此您需要使用一个新的withColumn,最好预先定义您的逻辑语句,以便于两个withColumn查询。
logical_gate = (f.col("c3") > 10)

(
    df.withColumn(
        "output_column", 
        f.when(logical_gate, 
               f.array_union(df.c1, f.array(f.lit("1"))))
         .otherwise(f.col("c1")))
      .withColumn(
        "c3",
        f.when(logical_gate,
               f.lit(None))
         .otherwise(f.col("c3")))
      .show()
)
+---------+---+----+-------------+
|       c1| c2|  c3|output_column|
+---------+---+----+-------------+
|[b, a, c]|  a|  10|    [b, a, c]|
|[b, a, c]|  d|null| [b, a, c, 1]|
+---------+---+----+-------------+

非常好,谢谢!有没有办法在一个when子句中执行两个操作?比如说当c3 > 10时,在数组c1中追加1,并用None替换c3。 - Lossa

0

Spark 3.4+ 版本开始,您可以使用 array_append 函数:

from pyspark.sql import functions as F

df = spark.createDataFrame([(10, ['a', 'b', 'c']), (20, ['a', 'b', 'c'])], ['c1', 'c2'])
df.show()
# +---+---------+
# | c1|       c2|
# +---+---------+
# | 10|[a, b, c]|
# | 20|[a, b, c]|
# +---+---------+

df = df.withColumn('c2', F.when(F.col('c1') > 15, F.array_append('c2', 'd')).otherwise(F.col('c2')))
df.show()
# +---+------------+
# | c1|          c2|
# +---+------------+
# | 10|   [a, b, c]|
# | 20|[a, b, c, d]|
# +---+------------+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接