如何使用pyspark collect_list函数检索所有列

3

我有一个pyspark 2.0.1。我正在尝试对我的数据框进行分组,并从我的数据框中检索所有字段的值。我发现

z=data1.groupby('country').agg(F.collect_list('names')) 

这段代码将为我提供国家和名称属性的值,并为名称属性提供列标题collect_list(names)。但是对于我的工作,我有一个包含约15个列的数据框,并且我将运行一个循环,每次在循环内更改groupby字段,并需要所有剩余字段的输出。请问您能否建议我如何使用collect_list()或任何其他pyspark函数来完成此操作?

我也尝试过这段代码

from pyspark.sql import functions as F 
fieldnames=data1.schema.names 
names1= list() 
for item in names: 
   if item != 'names': 
     names1.append(item) 
 z=data1.groupby('names').agg(F.collect_list(names1)) 
 z.show() 

但收到错误信息。
Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.collect_list. Trace: py4j.Py4JException: Method collect_list([class java.util.ArrayList]) does not exist 

我尝试了这段代码 `from pyspark.sql import functions as F fieldnames=data1.schema.names names1= list() for item in names: if item != 'names': names1.append(item)
#print itemz=data1.groupby('names').agg(F.collect_list(names1)) z.show()但是出现了错误信息Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.collect_list. Trace: py4j.Py4JException: Method collect_list([class java.util.ArrayList]) does not exist`。
- Python Learner
有什么建议吗? - Python Learner
请不要在评论区发布代码!请将您的帖子更新以包括代码段。 - desertnaut
4个回答

9

在调用groupBy之前,使用struct组合列

假设您有一个数据框

df = spark.createDataFrame(sc.parallelize([(0,1,2),(0,4,5),(1,7,8),(1,8,7)])).toDF("a","b","c")

df = df.select("a", f.struct(["b","c"]).alias("newcol"))
df.show()
+---+------+
|  a|newcol|
+---+------+
|  0| [1,2]|
|  0| [4,5]|
|  1| [7,8]|
|  1| [8,7]|
+---+------+
df = df.groupBy("a").agg(f.collect_list("newcol").alias("collected_col"))
df.show()
+---+--------------+
|  a| collected_col|
+---+--------------+
|  0|[[1,2], [4,5]]|
|  1|[[7,8], [8,7]]|
+---+--------------+

聚合操作只能在单个列上进行。

在聚合之后,您可以收集结果并对其进行迭代,以分离组合的列并生成索引字典。或者您可以编写一个UDF来分离组合的列。

from pyspark.sql.types import *
def foo(x):
    x1 = [y[0] for y in x]
    x2 = [y[1] for y in x]
    return(x1,x2)

st = StructType([StructField("b", ArrayType(LongType())), StructField("c", ArrayType(LongType()))])
udf_foo = udf(foo, st)
df = df.withColumn("ncol", 
                  udf_foo("collected_col")).select("a",
                  col("ncol").getItem("b").alias("b"), 
                  col("ncol").getItem("c").alias("c"))
df.show()

+---+------+------+
|  a|     b|     c|
+---+------+------+
|  0|[1, 4]|[2, 5]|
|  1|[7, 8]|[8, 7]|
+---+------+------+

非常感谢ashwinids。但是我的b和c列应该与a列分别识别,而不是通过合并b和c来识别collected_col。 - Python Learner
实际上我正在尝试这个,是因为我在这里提到了我的问题。 - Python Learner
有什么建议吗? - Python Learner
谢谢ashwinids。我收到了一个错误消息,StructType未定义。 - Python Learner
在顶部添加 from pyspark.sql.types import * 行以导入数据类型。 - pauli

3

实际上,我们可以在pyspark 2.2中完成此操作。

首先,我们需要创建一个常量列(“Temp”),通过该列(“Temp”)进行groupBy,并通过传递可迭代的* exprs应用agg,其中包含collect_list表达式。

以下是代码:

import pyspark.sql.functions as ftions
import functools as ftools

def groupColumnData(df, columns):
      df = df.withColumn("Temp", ftions.lit(1))
      exprs = [ftions.collect_list(colName) for colName in columns]
      df = df.groupby('Temp').agg(*exprs)
      df = df.drop("Temp")
      df = df.toDF(*columns)
      return df

输入数据:

df.show()
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  0|  1|  2|
|  0|  4|  5|
|  1|  7|  8|
|  1|  8|  7|
+---+---+---+

输出数据:

df.show()

    +------------+------------+------------+
    |           a|           b|           c|
    +------------+------------+------------+
    |[0, 0, 1, 1]|[1, 4, 7, 8]|[2, 5, 8, 7]|
    +------------+------------+------------+

1
在Spark 2.4.4和Python 3.7中(我猜这也适用于之前的Spark和Python版本)-- 我的建议基于pauli's的答案,不要创建结构体然后使用agg函数,而是在collect_list内部创建结构体:
df = spark.createDataFrame([(0,1,2),(0,4,5),(1,7,8),(1,8,7)]).toDF("a","b","c")
df.groupBy("a").agg(collect_list(struct(["b","c"])).alias("res")).show()

结果:

+---+-----------------+
|  a|res              |
+---+-----------------+
|  0|[[1, 2], [4, 5]] |
|  1|[[7, 8], [8, 7]] |
+---+-----------------+

0

我刚刚使用了Concat_ws函数,它完全正常。

> from pyspark.sql.functions import * df =
> spark.createDataFrame([(0,1,2),(0,4,5),(1,7,8),(1,8,7)]).toDF("a","b","c")
> df.groupBy('a').agg(collect_list(concat_ws(',','b','c'))).alias('r').show()

enter image description here


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接