Spark将字符串数组拆分成列。

9

我正在使用Java编写的Spark程序,并且我有一个类似如下结构的数据框:

id  | array_column
-------------------
12  | [a:123, b:125, c:456]
13  | [a:443, b:225, c:126]

我想要按照相同的id拆分array_column,但是explode不起作用,因为我希望数据框变成:
id  | a  | b  | c
-------------------
12  |123 |125 | 456 
13  |443 |225 | 126

你的映射中的元素数量是固定的还是可以更改的? - Artem Astashov
@ArtemAstashov 数字不是固定的,但如果需要,可以使用大数字来阻止它。 - Ofir
3个回答

7
以下方法适用于变长列表的 array_column。该方法使用 explode 来扩展 array_column 中字符串元素的列表,然后使用 : 将每个字符串元素分成两个不同的列 col_namecol_val。最后,使用一个分组并转置数据来将数据转换为所需的格式。
以下示例使用 pyspark api,但可以轻松地翻译为 java/scala apis,因为它们是相似的。假设您的数据集在名为 input_df 的 dataframe 中。
from pyspark.sql import functions as F

output_df = (
    input_df.select("id",F.explode("array_column").alias("acol"))
            .select(
                "id",
                F.split("acol",":")[0].alias("col_name"),
                F.split("acol",":")[1].cast("integer").alias("col_val")
            )
            .groupBy("id")
            .pivot("col_name")
            .max("col_val")
)

如果这对你有效,请告诉我。


5

一个类似于ggordon在Java中的答案的方法:

import static org.apache.spark.sql.functions.*;

Dataset<Row> df = ...

df.withColumn("array_column", explode(col("array_column")))
        .withColumn("array_column", split(col("array_column"), ":"))
        .withColumn("key", col("array_column").getItem(0))
        .withColumn("value", col("array_column").getItem(1))
        .groupBy(col("id"))
        .pivot(col("key"))
        .agg(first("value")) //1
        .show();

输出:

+---+---+---+---+
| id|  a|  b|  c|
+---+---+---+---+
| 12|456|225|126|
| 11|123|125|456|
+---+---+---+---+

我假设id和数组中的关键字段组合是唯一的。这就是为什么在//1处使用的聚合函数是first的原因。如果这个组合不是唯一的,那么聚合函数可以改为collect_list,以便获得所有匹配值的数组。

1

从列中的字符串中提取列名:

  • 创建一个正确的JSON字符串(用引号符号围绕json对象和值)
  • 使用此列创建模式
  • 创建结构并将其分解为列

输入示例:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [(12, ['a:123', 'b:125', 'c:456']),
     (13, ['a:443', 'b:225', 'c:126'])],
    ['id', 'array_col'])

df.show(truncate=0)
# +---+---------------------+
# |id |array_col            |
# +---+---------------------+
# |12 |[a:123, b:125, c:456]|
# |13 |[a:443, b:225, c:126]|
# +---+---------------------+

脚本:

df = df.withColumn("array_col", F.expr("to_json(str_to_map(array_join(array_col, ',')))"))
json_schema = spark.read.json(df.rdd.map(lambda row: row.array_col)).schema
df = df.withColumn("array_col", F.from_json("array_col", json_schema))
df = df.select("*", "array_col.*").drop("array_col")

df.show()
# +---+---+---+---+
# | id|  a|  b|  c|
# +---+---+---+---+
# | 12|123|125|456|
# | 13|443|225|126|
# +---+---+---+---+

1
非常聪明的技巧,通过将数据转换为JSON格式并根据模式重建数据框架。对此给予加分。 - Azhar Khan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接