将Spark数据框中的字符串列拆分为多个列

110

我看到有人建议使用Dataframe.explode这个方法来做这件事情,但是它会导致产生比原始数据框更多的行,这不是我想要的。我只是想做一个Dataframe中非常简单的等价操作:

rdd.map(lambda row: row + [row.my_str_col.split('-')])

它接受类似以下内容的东西:

col1 | my_str_col
-----+-----------
  18 |  856-yygrm
 201 |  777-psgdg

并将其转换为以下内容:

col1 | my_str_col | _col3 | _col4
-----+------------+-------+------
  18 |  856-yygrm |   856 | yygrm
 201 |  777-psgdg |   777 | psgdg

我知道pyspark.sql.functions.split(),但它会产生一个嵌套的数组列,而不是像我想要的两个顶级列。

理想情况下,我希望这些新列也有名称。

5个回答

167

pyspark.sql.functions.split() 是正确的方法 - 您只需要将嵌套的ArrayType列展平为多个顶级列。 在这种情况下,每个数组仅包含2个项目,这非常容易。 您只需使用 Column.getItem() 将数组的每个部分作为单独的列检索出来即可:

split_col = pyspark.sql.functions.split(df['my_str_col'], '-')
df = df.withColumn('NAME1', split_col.getItem(0))
df = df.withColumn('NAME2', split_col.getItem(1))

结果将会是:

col1 | my_str_col | NAME1 | NAME2
-----+------------+-------+------
  18 |  856-yygrm |   856 | yygrm
 201 |  777-psgdg |   777 | psgdg

我不确定在嵌套数组行与行不具有相同大小的一般情况下如何解决这个问题。


3
有没有办法将剩余的项目放在单独的一列中?例如,在第三列中使用split_col.getItem (2-n)。我猜想,像上面的循环为所有项目制作列,然后连接它们可能有效,但我不知道这是否非常高效。 - Chris
使用 df.withColumn('NAME_remaining', pyspark.sql.functions.split(df[my_str_col'],'-',3).getItem(2) 来获取剩余项。https://spark.apache.org/docs/latest/api/sql/index.html - Michael Møldrup
我发现,如果您尝试将拆分项之一重新分配给原始列,则必须在拆分之前使用withColumnRenamed()重命名原始列,以避免与https://issues.apache.org/jira/browse/SPARK-14948相关的错误。 - Steve
你如何执行一个分割,使得分割的第一部分是列名,第二部分是列值? - Rachana Gandhi

54

这里提供了一种通用解决方案,不需要事先知道数组长度、使用collect或使用udf。不幸的是,这只适用于spark 2.1及以上版本,因为它要求使用posexplode函数。

假设您有以下DataFrame:

df = spark.createDataFrame(
    [
        [1, 'A, B, C, D'], 
        [2, 'E, F, G'], 
        [3, 'H, I'], 
        [4, 'J']
    ]
    , ["num", "letters"]
)
df.show()
#+---+----------+
#|num|   letters|
#+---+----------+
#|  1|A, B, C, D|
#|  2|   E, F, G|
#|  3|      H, I|
#|  4|         J|
#+---+----------+

letters 列拆分,然后使用 posexplode 将结果数组与数组中的位置一起展开。接下来使用 pyspark.sql.functions.expr 获取该数组中索引为pos的元素。

import pyspark.sql.functions as f

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .show()
#+---+------------+---+---+
#|num|     letters|pos|val|
#+---+------------+---+---+
#|  1|[A, B, C, D]|  0|  A|
#|  1|[A, B, C, D]|  1|  B|
#|  1|[A, B, C, D]|  2|  C|
#|  1|[A, B, C, D]|  3|  D|
#|  2|   [E, F, G]|  0|  E|
#|  2|   [E, F, G]|  1|  F|
#|  2|   [E, F, G]|  2|  G|
#|  3|      [H, I]|  0|  H|
#|  3|      [H, I]|  1|  I|
#|  4|         [J]|  0|  J|
#+---+------------+---+---+

现在我们从这个结果创建两个新列。第一个是我们新列的名称,它将是letter和数组中的索引的连接。第二列将是数组中相应索引处的值。我们通过利用pyspark.sql.functions.expr的功能来获得后者,该功能允许我们使用列值作为参数

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .drop("val")\
    .select(
        "num",
        f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
        f.expr("letters[pos]").alias("val")
    )\
    .show()
#+---+-------+---+
#|num|   name|val|
#+---+-------+---+
#|  1|letter0|  A|
#|  1|letter1|  B|
#|  1|letter2|  C|
#|  1|letter3|  D|
#|  2|letter0|  E|
#|  2|letter1|  F|
#|  2|letter2|  G|
#|  3|letter0|  H|
#|  3|letter1|  I|
#|  4|letter0|  J|
#+---+-------+---+

现在我们可以通过对num进行groupBy并对DataFrame进行pivot来完成。将所有内容组合起来,我们得到:

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .drop("val")\
    .select(
        "num",
        f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
        f.expr("letters[pos]").alias("val")
    )\
    .groupBy("num").pivot("name").agg(f.first("val"))\
    .show()
#+---+-------+-------+-------+-------+
#|num|letter0|letter1|letter2|letter3|
#+---+-------+-------+-------+-------+
#|  1|      A|      B|      C|      D|
#|  3|      H|      I|   null|   null|
#|  2|      E|      F|      G|   null|
#|  4|      J|   null|   null|   null|
#+---+-------+-------+-------+-------+

1
我尝试使用3909个元素对大约1.7M个原始行进行拆分,但经过一个小时后速度太慢或无法完成。 - osuwireless

20

这里是另一种方法,如果您想使用分隔符拆分字符串。

import pyspark.sql.functions as f

df = spark.createDataFrame([("1:a:2001",),("2:b:2002",),("3:c:2003",)],["value"])
df.show()
+--------+
|   value|
+--------+
|1:a:2001|
|2:b:2002|
|3:c:2003|
+--------+

df_split = df.select(f.split(df.value,":")).rdd.flatMap(
              lambda x: x).toDF(schema=["col1","col2","col3"])

df_split.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   a|2001|
|   2|   b|2002|
|   3|   c|2003|
+----+----+----+

我认为这种来回转换到RDD不会减缓你的速度...另外,不要担心最后一个模式规范:它是可选的,你可以通过将解决方案推广到具有未知列大小的数据来避免它。


1
我该如何在Scala中实现这个?我在使用flatMap lambda函数时遇到了困难。 - KillerSnail
1
请注意,模式是以正则表达式的形式给出的,因此您需要使用\来表示特殊字符。 - moshe beeri
1
如果您不想在表达式中引用 df,可以将列的名称传递给 split,即 df.select(f.split("value",":"))... - Michele Piccolini
@moshebeeri 你救了我! - diman82
如果有多列(“value”),那该怎么办?在这种情况下,flatMap会如何表现? - JoyfulPanda
通常情况下,当Spark无法唯一地识别一个列时,会引发此错误:pyspark.sql.utils.AnalysisException: Reference 'value' is ambiguous, could be: value, value. - Luca Soato

2

我们可以使用Column[i]代替Column.getItem(i)
此外,在大型数据框中,enumerate非常有用。

from pyspark.sql import functions as F
  • 保留原列:

    for i, c in enumerate(['new_1', 'new_2']):
        df = df.withColumn(c, F.split('my_str_col', '-')[i])
    

    或者

    new_cols = ['new_1', 'new_2']
    df = df.select('*', *[F.split('my_str_col', '-')[i].alias(c) for i, c in enumerate(new_cols)])
    
  • 替换原列:

    for i, c in enumerate(['new_1', 'new_2']):
        df = df.withColumn(c, F.split('my_str_col', '-')[i])
    df = df.drop('my_str_col')
    

    或者

    new_cols = ['new_1', 'new_2']
    df = df.select(
        *[c for c in df.columns if c != 'my_str_col'],
        *[F.split('my_str_col', '-')[i].alias(c) for i, c in enumerate(new_cols)]
    )
    

2

我了解您的痛苦。使用split()函数可以起到作用,但也可能导致错误。

让我们对您提供的数据框进行微小的更改:

df = spark.createDataFrame([('1:"a:3":2001',),('2:"b":2002',),('3:"c":2003',)],["value"]) 

df.show()

+------------+
|       value|
+------------+
|1:"a:3":2001|
|  2:"b":2002|
|  3:"c":2003|
+------------+

如果按照上述方式尝试将split()应用于此内容:

df_split = df.select(split(df.value,":")).rdd.flatMap(
              lambda x: x).toDF(schema=["col1","col2","col3"]).show()

您将会得到:

IllegalStateException: 输入行的值与模式不匹配。 模式需要4个字段,但只提供了3个值。

那么,有没有更加优雅的解决方法呢?我很高兴有人向我指出这一点。pyspark.sql.functions.from_csv()是您的好朋友。

以上面的示例df为例:

from pyspark.sql.functions import from_csv

# Define a column schema to apply with from_csv()
col_schema = ["col1 INTEGER","col2 STRING","col3 INTEGER"]
schema_str = ",".join(col_schema)

# define the separator because it isn't a ','
options = {'sep': ":"}

# create a df from the value column using schema and options
df_csv = df.select(from_csv(df.value, schema_str, options).alias("value_parsed"))
df_csv.show()

+--------------+
|  value_parsed|
+--------------+
|[1, a:3, 2001]|
|  [2, b, 2002]|
|  [3, c, 2003]|
+--------------+

然后,我们可以轻松地将df展平以将值放入列中:

df2 = df_csv.select("value_parsed.*").toDF("col1","col2","col3")
df2.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1| a:3|2001|
|   2|   b|2002|
|   3|   c|2003|
+----+----+----+

没有中断。数据正确解析。生活很美好。来一杯啤酒。

在split()方法中使用这个正则表达式也可以解决问题- : - Mohana B C

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接