如何在pyspark中使用groupBy、collect_list、arrays_zip和explode来解决某些业务问题

3

我是pyspark世界的新手。
想要在列days上将两个数据帧dfdf_sd进行连接。在连接时,还应该使用df数据帧中的Name列。如果来自df数据帧的Namedays组合没有匹配值,则应为null。请参见下面的代码和期望输出以更好地理解。

 import findspark

findspark.init("/opt/spark")

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark.sql.types import IntegerType

Mydata = Row("Name", "Number", "days")

spark = SparkSession \
    .builder \
    .appName("DataFrame Learning") \
    .getOrCreate()

sqlContext = SQLContext(spark)

mydata1 = Mydata("A", 100, 1)
mydata2 = Mydata("A", 200, 2)
mydata3 = Mydata("B", 300, 1)
mydata4 = Mydata("B", 400, 2)
mydata5 = Mydata("B", 500, 3)
mydata6 = Mydata("C", 600, 1)
myDataAll = [mydata1, mydata2, mydata3, mydata4, mydata5, mydata6]

STANDARD_TENORS = [1, 2, 3]

df_sd = spark.createDataFrame(STANDARD_TENORS, IntegerType())
df_sd = df_sd.withColumnRenamed("value", "days")

df_sd.show()

df = spark.createDataFrame(myDataAll)
df.show()
+----+
# |days|
# +----+
# |   1|
# |   2|
# |   3|
# +----+
# 
# +----+------+----+
# |Name|Number|days|
# +----+------+----+
# |   A|   100|   1|
# |   A|   200|   2|
# |   B|   300|   1|
# |   B|   400|   2|
# |   B|   500|   3|
# |   C|   600|   1|
# +----+------+----+

请查看下面来自join的预期结果。

# +----+------+----+
# |Name|Number|days|
# +----+------+----+
# |   A|   100|   1|
# |   A|   200|   2|
# |   A|Null  |   3|
# |   B|   300|   1|
# |   B|   400|   2|
# |   B|   500|   3|
# |   C|   600|   1|
# |   C|Null  |   2|
# |   C|Null  |   3|
# +----+------+----+


1
df_sd会一直是一个小的日期列表吗?你的spark版本是多少? - murtihash
是的,df_sd将始终是一个小列表。 - GPopat
1个回答

1
如果 df_sd 不是一个非常庞大的列表,并且你使用的是 spark2.4,你可以通过在 df 中创建一个新列来包含天数列表(1,2,3),然后使用 groupBycollect_listarrays_zipexplode 来实现。在 groupBy 之前使用 orderBy 来确保列表被正确地收集。
df.show()
+----+------+----+
|Name|Number|days|
+----+------+----+
|   A|   100|   1|
|   A|   200|   2|
|   B|   300|   1|
|   B|   400|   2|
|   B|   500|   3|
|   C|   600|   1|
+----+------+----+
STANDARD_TENORS #->  [1, 2, 3] 
                #-> should be ordered



from pyspark.sql import functions as F
df.withColumn("days2", F.array(*[F.lit(x) for x in STANDARD_TENORS]))\
  .orderBy("Name","days")\
  .groupBy("Name").agg(F.collect_list("Number").alias("Number")\
                      ,F.first("days2").alias("days"))\
  .withColumn("zipped", F.explode(F.arrays_zip("Number","days")))\
  .select("Name","zipped.*").orderBy("Name","days").show()


+----+------+----+
|Name|Number|days|
+----+------+----+
|   A|   200|   1|
|   A|   100|   2|
|   A|  null|   3|
|   B|   300|   1|
|   B|   400|   2|
|   B|   500|   3|
|   C|   600|   1|
|   C|  null|   2|
|   C|  null|   3|
+----+------+----+

如果您想使用 join,可以以类似的方式进行:
from pyspark.sql import functions as F
df_sd.agg(F.collect_list("days").alias("days")).join(\
df.orderBy("Name","days").groupBy("Name")\
.agg(F.collect_list("Number").alias("Number"),F.collect_list("days").alias("days1")),\
                      F.size("days")>=F.size("days1")).drop("days1")\
     .withColumn("zipped", F.explode(F.arrays_zip("Number","days")))\
     .select("Name","zipped.*")\
     .orderBy("Name","days")\
     .show()

更新:

为了处理Number中的任何订单任何现有价值,我可以使代码更加简洁,但我将其保持原样,以便您可以看到我使用的所有列以理解逻辑。如有任何问题,请随时提问。

df.show()
#newsampledataframe
+----+------+----+
|Name|Number|days|
+----+------+----+
|   A|   100|   1|
|   A|   200|   2|
|   B|   300|   1|
|   B|   400|   2|
|   B|   500|   3|
|   C|   600|   3|
+----+------+----+
#STANDARD_TENORS = [1, 2, 3]

from pyspark.sql import functions as F
df.withColumn("days2", F.array(*[F.lit(x) for x in STANDARD_TENORS]))\
  .groupBy("Name").agg(F.collect_list("Number").alias("col1")\
                      ,F.first("days2").alias("days2"),F.collect_list("days").alias("x"))\
  .withColumn("days3", F.arrays_zip(F.col("col1"),F.col("x")))\
  .withColumn("days4", F.array_except("days2","x"))\
  .withColumn("day5", F.expr("""transform(days4,x-> struct(bigint(-1),x))"""))\
  .withColumn("days3", F.explode(F.array_union("days3","day5"))).select("Name","days3.*")\
  .withColumn("Number", F.when(F.col("col1")==-1, F.lit(None)).otherwise(F.col("col1"))).drop("col1")\
  .select("Name", "Number", F.col("x").alias("days"))\
  .orderBy("Name","days")\
  .show(truncate=False)

非常感谢 @Mohammad Murtaza Hashmi,这对我有用!如果时间允许的话,您可以再解释一下逻辑,这对像我这样的新学习者会有所帮助(当然,我会在使用的函数上搜索一下谷歌 :-) )。 - GPopat
1
是的,首先我们使用orderyby来确保我们按正确的顺序收集数据,然后使用groupby Name来按组收集列表,并获取我们创建的days列表。现在我们将会有类似[100,200]的Numbers,但你的days列表将始终是[1,2,3]。因此,一旦我们使用arrays_zip将它们进行合并并展开,Number就不会有第三个值,所以3天将自动在Numbers中创建一个空值。然后我们可以从合并的列中选择我们需要的列。主要逻辑位于arrays_zip中,这一点非常重要,希望你能理解。 - murtihash
完整文档请参考:https://spark.apache.org/docs/latest/api/python/pyspark.sql.html。 - murtihash
1
似乎解决方案存在一个问题。如果将输入从C,600,1更改为C,600,3,它会给出错误的输出。@Mohammad Murtaza Hashmi,请您检查一下好吗? - GPopat
1
检查更新。它将适用于任何情况。 - murtihash
太好了!它已经修复了!非常感谢 @Mohammad Murtaza Hashmi - GPopat

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接