类型错误:列不可迭代 - 如何迭代ArrayType()?

16

考虑以下数据框:

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[john, sam, jane]      |
|pet   |[whiskers, rover, fido]|
+------+-----------------------+

可以使用以下代码创建:

import pyspark.sql.functions as f
data = [
    ('person', ['john', 'sam', 'jane']),
    ('pet', ['whiskers', 'rover', 'fido'])
]

df = sqlCtx.createDataFrame(data, ["type", "names"])
df.show(truncate=False)

有没有一种方法可以直接修改 ArrayType() 列的 "names" 字段,而不使用 udf,并对每个元素应用一个函数?

例如,假设我想将函数 foo 应用于 "names" 列。 (我将使用函数 str.upper 的示例只是为了说明目的,但我的问题涉及可应用于可迭代对象元素的任何有效函数。)

foo = lambda x: x.upper()  # defining it as str.upper as an example
df.withColumn('X', [foo(x) for x in f.col("names")]).show()

TypeError: 列不可迭代

我可以使用udf来完成这个任务:

foo_udf = f.udf(lambda row: [foo(x) for x in row], ArrayType(StringType()))
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
#+------+-----------------------+
#|type  |names                  |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE]      |
#|pet   |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+

在这个具体的例子中,我可以通过拆分列、调用 pyspark.sql.functions.upper() 然后执行 groupBycollect_list 来避免使用 udf

df.select('type', f.explode('names').alias('name'))\
    .withColumn('name', f.upper(f.col('name')))\
    .groupBy('type')\
    .agg(f.collect_list('name').alias('names'))\
    .show(truncate=False)
#+------+-----------------------+
#|type  |names                  |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE]      |
#|pet   |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+

但是这需要很多代码才能完成一些简单的任务。是否有更直接的方法来使用spark-dataframe函数迭代ArrayType()元素?

2个回答

11

Spark < 2.4中,您可以使用用户定义的函数:

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DataType, StringType

def transform(f, t=StringType()):
    if not isinstance(t, DataType):
       raise TypeError("Invalid type {}".format(type(t)))
    @udf(ArrayType(t))
    def _(xs):
        if xs is not None:
            return [f(x) for x in xs]
    return _

foo_udf = transform(str.upper)

df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+

考虑到explode + collect_list惯用语的高成本,尽管其固有成本较高,但几乎是唯一的首选方法。

Spark 2.4或更高版本中,您可以使用transform*与upper(请参见SPARK-23909):

from pyspark.sql.functions import expr

df.withColumn(
    'names', expr('transform(names, x -> upper(x))')
).show(truncate=False)

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+

还可以使用pandas_udf

from pyspark.sql.functions import pandas_udf, PandasUDFType

def transform_pandas(f, t=StringType()):
    if not isinstance(t, DataType):
       raise TypeError("Invalid type {}".format(type(t)))
    @pandas_udf(ArrayType(t), PandasUDFType.SCALAR)
    def _(xs):
        return xs.apply(lambda xs: [f(x) for x in xs] if xs is not None else xs)
    return _

foo_udf_pandas = transform_pandas(str.upper)

df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+

虽然只有最新的Arrow/PySpark组合支持处理ArrayType列(SPARK-24259SPARK-21187),但此选项应比标准UDF更有效(尤其是在具有较低serde开销的情况下),同时支持任意Python函数。

* 还支持许多其他高阶函数, 包括但不限于filteraggregate。例如,请参见


1
@pault 感谢您的编辑。Spark自己的SQL文档可能是一个更好的目标(尽管它们没有按类别分组)-您觉得呢? - 10465355

1

是的,您可以将其转换为RDD,然后再转换回DF。

>>> df.show(truncate=False)
+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[john, sam, jane]      |
|pet   |[whiskers, rover, fido]|
+------+-----------------------+

>>> df.rdd.mapValues(lambda x: [y.upper() for y in x]).toDF(["type","names"]).show(truncate=False)
+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+

谢谢回复。我也知道这种方法,但我正在寻找仅使用spark-dataframe语法的解决方案。您知道将RDD序列化并返回与使用UDF相比如何?我的理解是使用UDF更好,但我没有文档来支持这一点。 - pault
据我所知,一旦数据进入Python,Spark就无法管理工作节点的内存。JVM和Python在单台机器上竞争内存,导致资源受限可能导致某个工作节点失败。 - Bala

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接