Pyspark:将多个数组列拆分为行

87

我有一个数据框,其中有一行和多个列。其中一些列是单值,而其他列是列表。所有的列表列长度都相同。我想将每个列表列拆分成单独的行,同时保持任何非列表列不变。

示例数据框:

from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode

sqlc = SQLContext(sc)

df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# |  a|        b|        c|  d|
# +---+---------+---------+---+
# |  1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+

我想要的是:

+---+---+----+------+
|  a|  b|  c |    d |
+---+---+----+------+
|  1|  1|  7 |  foo |
|  1|  2|  8 |  foo |
|  1|  3|  9 |  foo |
+---+---+----+------+

如果我只有一个列表列,那么只需执行explode操作即可轻松完成:

df_exploded = df.withColumn('b', explode('b'))
# >>> df_exploded.show()
# +---+---+---------+---+
# |  a|  b|        c|  d|
# +---+---+---------+---+
# |  1|  1|[7, 8, 9]|foo|
# |  1|  2|[7, 8, 9]|foo|
# |  1|  3|[7, 8, 9]|foo|
# +---+---+---------+---+

然而,如果我尝试同时对c列进行explode,最终得到的数据框长度将是我想要的平方:

df_exploded_again = df_exploded.withColumn('c', explode('c'))
# >>> df_exploded_again.show()
# +---+---+---+---+
# |  a|  b|  c|  d|
# +---+---+---+---+
# |  1|  1|  7|foo|
# |  1|  1|  8|foo|
# |  1|  1|  9|foo|
# |  1|  2|  7|foo|
# |  1|  2|  8|foo|
# |  1|  2|  9|foo|
# |  1|  3|  7|foo|
# |  1|  3|  8|foo|
# |  1|  3|  9|foo|
# +---+---+---+---+

我想要的是 - 对于每一列,取该列数组中的第n个元素,并将其添加到新行中。我尝试在数据帧的所有列上映射一个爆炸,但那似乎也不起作用:

df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()
4个回答

109

Spark >= 2.4

你可以使用 arrays_zip 函数替代 zip_ udf

from pyspark.sql.functions import arrays_zip, col, explode

(df
    .withColumn("tmp", arrays_zip("b", "c"))
    .withColumn("tmp", explode("tmp"))
    .select("a", col("tmp.b"), col("tmp.c"), "d"))

Spark < 2.4

使用 DataFrames 和 UDF:

from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, udf, explode

zip_ = udf(
  lambda x, y: list(zip(x, y)),
  ArrayType(StructType([
      # Adjust types to reflect data types
      StructField("first", IntegerType()),
      StructField("second", IntegerType())
  ]))
)

(df
    .withColumn("tmp", zip_("b", "c"))
    # UDF output cannot be directly passed to explode
    .withColumn("tmp", explode("tmp"))
    .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))

使用 RDDs

(df
    .rdd
    .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
    .toDF(["a", "b", "c", "d"]))

由于Python通讯开销,这两种方法都效率低下。如果数据大小固定,您可以像这样操作:

from functools import reduce
from pyspark.sql import DataFrame

# Length of array
n = 3

# For legacy Python you'll need a separate function
# in place of method accessor 
reduce(
    DataFrame.unionAll, 
    (df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
        for i in range(n))
).toDF("a", "b", "c", "d")

甚至更多:

from pyspark.sql.functions import array, struct

# SQL level zip of arrays of known size
# followed by explode
tmp = explode(array(*[
    struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
    for i in range(n)
]))

(df
    .withColumn("tmp", tmp)
    .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))

这应该比UDF或RDD快得多。支持任意数量的列进行广义化:

# This uses keyword only arguments
# If you use legacy Python you'll have to change signature
# Body of the function can stay the same
def zip_and_explode(*colnames, n):
    return explode(array(*[
        struct(*[col(c).getItem(i).alias(c) for c in colnames])
        for i in range(n)
    ]))

df.withColumn("tmp", zip_and_explode("b", "c", n=3))

Spark >= 2.4的解决方案如何实际工作?文档指出,explode输入“应该是数组或映射类型,而不是字符串”,直接引用它引发的异常。 https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=explode#pyspark.sql.functions.explode - ciurlaro
你如何处理不同列中大小不均的列表?要求将较短的列表替换为-1。现在它显示为空值。 - Vikram Ranabhatt
当我使用StringType运行我的代码时,出现了“zip argument #1 must support iteration”的错误。我该怎么解决这个错误? - Eunike Kamase

11

你需要使用flatMap,而不是map,因为你想把每个输入行转换成多个输出行。

from pyspark.sql import Row
def dualExplode(r):
    rowDict = r.asDict()
    bList = rowDict.pop('b')
    cList = rowDict.pop('c')
    for b,c in zip(bList, cList):
        newDict = dict(rowDict)
        newDict['b'] = b
        newDict['c'] = c
        yield Row(**newDict)

df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))

如果第一个df有三个值,第二个df有两个值,我们的zip函数会返回两对值而不是三对。请您给予建议。 - Dugini Vijay
将Zip函数用于对象时,它会配对第一个对象的第一个元素与另一个对象的第一个元素,第二个与第二个,以此类推,直到其中一个对象的元素用完为止。在您的情况下,一共是2个值。换句话说,它会将元素配对,直到没有更多项可配对为止。如果您想提出任何建议,我需要知道您希望程序如何处理未配对的元素(例如,您是否希望从第二个集合得到null值?)。此示例中只有一个df。如果您的问题与此不同,最好就直接提出另一个问题。 - David
1
谢谢 @David 的回复。我已经解决了这个问题。使用 Izip 帮助我解决了这个问题。但是还是感谢你的回复,伙计。 - Dugini Vijay

7

一句话(适用于 Spark>=2.4.0):

df.withColumn("bc", arrays_zip("b","c"))
  .select("a", explode("bc").alias("tbc"))
  .select("a", col"tbc.b", "tbc.c").show()

导入所需内容:

from pyspark.sql.functions import arrays_zip


步骤 -

  1. 创建一个名为“bc”的列,该列是bc列的array_zip
  2. bc展开以获取结构体tbc
  3. 选择所需的列abc(所有列都按照要求展开)。

输出:

> df.withColumn("bc", arrays_zip("b","c")).select("a", explode("bc").alias("tbc")).select("a", "tbc.b", col("tbc.c")).show()
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  1|  7|
|  1|  2|  8|
|  1|  3|  9|
+---+---+---+

0
ps.DataFrame(df[['b','c']].pandas_api().iloc[0].to_dict()).to_spark()\
    .join(df[['a','d']],how='cross').show()

输出:

+---+---+----+------+
|  a|  b|  c |    d |
+---+---+----+------+
|  1|  1|  7 |  foo |
|  1|  2|  8 |  foo |
|  1|  3|  9 |  foo |
+---+---+----+------+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接