如何在PySpark中按多列进行分组并将其收集到列表中?

3
这里是我的问题: 我有一个RDD:
a = [[u'PNR1',u'TKT1',u'TEST',u'a2',u'a3'],[u'PNR1',u'TKT1',u'TEST',u'a5',u'a6'],[u'PNR1',u'TKT1',u'TEST',u'a8',u'a9']]

rdd= sc.parallelize (a)

然后我尝试:
rdd.map(lambda x: (x[0],x[1],x[2], list(x[3:])))

.toDF(["col1","col2","col3","col4"])

.groupBy("col1","col2","col3")

.agg(collect_list("col4")).show

最终,我应该找到这个:
[col1,col2,col3,col4]=[u'PNR1',u'TKT1',u'TEST',[[u'a2',u'a3'][u'a5',u'a6'][u'a8',u'a9']]]
但问题在于我无法收集一个列表。
如果有人能帮助我,我将非常感激。

“我无法收集列表”是什么意思? - eliasah
该函数collect_list无法接收一个列表。我尝试收集一个列表的列表。 - Carlos Lopez Sobrino
你正在使用哪个版本的Spark? - eliasah
Spark版本1.6.2 - Carlos Lopez Sobrino
1
你能切换到Spark 2+吗?Spark 1.6使用Hive UDAF执行collect_list,而在Spark 2+中已经重新实现以接受列表的列表。 - eliasah
很抱歉,由于这个平台有很多自动化流程,如果我们升级的话,就需要迁移每一个流程... :( - Carlos Lopez Sobrino
3个回答

2

我终于找到了一个解决方案,虽然不是最好的方法,但我可以继续工作...

from pyspark.sql.functions import udf
from pyspark.sql.functions import *

def example(lista):
    d = [[] for x in range(len(lista))]
    for index, elem in enumerate(lista):
      d[index] = elem.split("@")
    return d
example_udf = udf(example, LongType())

a = [[u'PNR1',u'TKT1',u'TEST',u'a2',u'a3'],[u'PNR1',u'TKT1',u'TEST',u'a5',u'a6'],[u'PNR1',u'TKT1',u'TEST',u'a8',u'a9']]

rdd= sc.parallelize (a)

df = rdd.toDF(["col1","col2","col3","col4","col5"])

df2=df.withColumn('col6', concat(col('col4'),lit('@'),col('col5'))).drop(col("col4")).drop(col("col5")).groupBy([col("col1"),col("col2"),col("col3")]).agg(collect_set(col("col6")).alias("col6"))

df2.map(lambda x: (x[0],x[1],x[2],example(x[3]))).collect()

它提供了:

[(u'PNR1', u'TKT1', u'TEST', [[u'a2', u'a3'], [u'a5', u'a6'], [u'a8', u'a9']])]

希望这个解决方案能帮助其他人。

感谢您所有的回答。


1
这可能可以完成您的工作(或为您提供进一步操作的一些想法)...
一个想法是将您的col4转换为原始数据类型,即字符串:
from pyspark.sql.functions import collect_list
import pandas as pd

a = [[u'PNR1',u'TKT1',u'TEST',u'a2',u'a3'],[u'PNR1',u'TKT1',u'TEST',u'a5',u'a6'],[u'PNR1',u'TKT1',u'TEST',u'a8',u'a9']]
rdd = sc.parallelize(a)

df = rdd.map(lambda x: (x[0],x[1],x[2], '(' + ' '.join(str(e) for e in x[3:]) + ')')).toDF(["col1","col2","col3","col4"])

df.groupBy("col1","col2","col3").agg(collect_list("col4")).toPandas().values.tolist()[0]
#[u'PNR1', u'TKT1', u'TEST', [u'(a2 a3)', u'(a5 a6)', u'(a8 a9)']]

更新(在您自己的答案之后):

我真的认为我上面达到的点足以根据您的需求进行进一步调整,而且我当时没有时间自己去做;所以,在修改我的df定义以摆脱括号后,这只是一个单一列表理解的问题:

df = rdd.map(lambda x: (x[0],x[1],x[2], ' '.join(str(e) for e in x[3:]))).toDF(["col1","col2","col3","col4"])

# temp list:
ff = df.groupBy("col1","col2","col3").agg(collect_list("col4")).toPandas().values.tolist()[0]
ff
# [u'PNR1', u'TKT1', u'TEST', [u'a2 a3', u'a5 a6', u'a8 a9']]

# final list of lists:
ll = ff[:-1] + [[x.split(' ') for x in ff[-1]]]
ll

这将给出您最初请求的结果:

[u'PNR1', u'TKT1', u'TEST', [[u'a2', u'a3'], [u'a5', u'a6'], [u'a8', u'a9']]]  # requested output

与您自己的答案提供的方法相比,这种方法具有一定的优势

  • 它避免了使用Pyspark UDFs,这些UDFs已知速度较慢
  • 所有处理都在最终(希望更小)的聚合数据中完成,而不是在初始(可能更大)的数据中添加和删除列并执行map函数和UDFs

实际上我需要在col4中有一个列表的列表,在你的回答中,我得到了一个字符串类型的(例如a2 a3),而我需要[[a2,a3],[a5,a6],[a8,a9]]。 - Carlos Lopez Sobrino
@CarlosLopezSobrino,最新的答案难道不是你所要求的吗? - desertnaut

0

由于您无法升级到2.x,您唯一的选择是使用RDD API。请将您当前的代码替换为:

rdd.map(lambda x: ((x[0], x[1], x[2]), list(x[3:]))).groupByKey().toDF()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接