PySpark: 在Spark 2.3中的Arrays_zip等效方法

7
如何在Spark 2.3中编写与arrays_zip等效的函数?
源代码来自Spark 2.4。
def arrays_zip(*cols):
    """
    Collection function: Returns a merged array of structs in which the N-th struct contains all
    N-th values of input arrays.

    :param cols: columns of arrays to be merged.

    >>> from pyspark.sql.functions import arrays_zip
    >>> df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
    >>> df.select(arrays_zip(df.vals1, df.vals2).alias('zipped')).collect()
    [Row(zipped=[Row(vals1=1, vals2=2), Row(vals1=2, vals2=3), Row(vals1=3, vals2=4)])]
    """
    sc = SparkContext._active_spark_context
    return Column(sc._jvm.functions.arrays_zip(_to_seq(sc, cols, _to_java_column)))

如何在PySpark中实现类似功能?

你可以尝试测试以下代码:f=lambda x,y:list(zip(x,y)); myudf = F.udf(f,ArrayType(StructType([StructField('vals1',IntegerType(),False),StructField('vals2',IntegerType(),False)]))),之后使用df.select(myudf(F.col('vals1'),F.col('vals2'))).collect()进行操作。如果你没有将引入命名为“F”,则删除前缀“F”。 - anky
3个回答

1
您可以使用 UDF 来获得与 arrays_zip 相同的功能。请注意,此方法需要列类型相同(在此例中为 IntegerType)。如果列类型有任何差异,请在使用 UDF 之前将其转换为通用类型。
from pyspark.sql import functions as F
from pyspark.sql import types as T

def zip_func(*args):
    return list(zip(*args))

zip_udf = F.udf(zip_func, T.ArrayType(T.ArrayType(T.IntegerType())))

它可以像使用arrays_zip一样使用,例如:
df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
df.select(zip_udf(df.vals1, df.vals2).alias('zipped')).collect()

这段代码对你来说运行了吗?我看到一个奇怪的错误:net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for builtins.iter) - bp2010
@bp2010:我现在无法尝试代码(需要等到今晚在我的时区),但错误与返回类型不匹配的UDF声明有关。我更改了答案中的代码,请尝试是否有效。(如果不使用带有return list([list(z) for z in zip(*args)])的UDF,那么最有可能会起作用,但我认为没有必要这样做。) - Shaido
现在程序可以运行。但是,我试图使用这个函数来解压缩zip文件。但是现在使用这个函数时,我看到了错误:org.apache.spark.sql.AnalysisException: Can only star expand struct data types. Attribute: ArrayBuffer(cols). - bp2010
@bp2010:你确定你在使用explode吗?这看起来像是expand的错误。expand适用于结构体,而在这种情况下,zip返回一个数组的数组。这可以通过返回一个结构体数组来解决(请参见andy在问题中的评论),但它不会根据列数动态变化。 - Shaido
是的,我正在使用 explode。我在这里发布了逻辑:https://stackoverflow.com/a/61087359/3213111 我使用 arrays_zip 来确保它是动态的,因为我需要这样做。你有没有想过如何以动态方式处理列? - bp2010
总的来说,我认为这个答案不符合“arrays_zip”的要求;因为返回类型是“返回合并的结构数组...” - bp2010

1
你可以通过创建用户定义的函数来实现这一点。
import pyspark.sql.functions as f
import pyspark.sql.types as t

arrays_zip_ = f.udf(lambda x, y: list(zip(x, y)),  
      t.ArrayType(t.StructType([
          # Choose Datatype according to requirement
          t.StructField("first", t.IntegerType()),
          t.StructField("second", t.StringType())
  ])))

df = spark.createDataFrame([(([1, 2, 3], ['2', '3', '4']))], ['first', 'second'])

现在的结果是 spark<=2.3。其中包含HTML标记,不进行解释。
df.select(arrays_zip_('first', 'second').alias('zipped')).show(2,False)

+------------------------+
|zipped                  |
+------------------------+
|[[1, 2], [2, 3], [3, 4]]|
+------------------------+

并且使用Spark版本2.4得到结果。
df.select(f.arrays_zip('first', 'second').alias('zipped')).show(2,False)

+------------------------+
|zipped                  |
+------------------------+
|[[1, 2], [2, 3], [3, 4]]|
+------------------------+

1
以上代码仅适用于2个数组,而arrays_zip可用于任意数量的数组。 - Shaido
这为您提供了假定合并的数据类型的可行性。我们随时可以动态创建此代码。 - Shubham Jain
1
可以将其制作成动态的,使用动态列集,而不是像上面那样固定吗? - bp2010
使用当前函数与数组一起会出现错误:TypeError: <lambda>() missing 1 required positional argument: 'y' - bp2010

0
你可以简单地使用f.array,但是你必须稍后通过索引而不是列名来获取值(这是唯一的区别)。
from pyspark.sql import functions as f

df = df.withColumn('combined', f.array(f.col('col1'), f.col('col2'), f.col('col3')))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接