基于名称,将PySpark列表拆分为多列

5

你好,我正在处理一个稍微有些棘手的文件格式,需要清理以进行未来的处理。我一直在使用Pyspark将数据处理成数据框。

这个文件看起来像这样:

AA 1234  ZXYW
BB A 890
CC B 321
AA 1234  LMNO
BB D 123
CC E 321
AA 1234  ZXYW
CC E 456

每个“AA”记录定义了一组逻辑记录的开头,每行数据都是固定长度的,并且其中编码了我想要提取的信息。至少有20-30种不同的记录类型。它们总是以每行开头的两个字母代码来标识。每个组中可以有1个或多个不同的记录类型(即并非每个组都包含所有记录类型)。
作为第一阶段,我已经成功将记录以以下格式分组:
+----------------+---------------------------------+
|           index|                           result|
+----------------+---------------------------------+
|               1|[AA 1234  ZXYV,BB A 890,CC B 321]|
|               2|[AA 1234  LMNO,BB D 123,CC E 321]|
|               3|[AA 1234  ZXYV,CC B 321]         |
+----------------+---------------------------------+

作为第二阶段,我真的希望将数据导入到dataframe中以下列中:
+----------------+---------------------------------+-------------+--------+--------+
|           index|                           result|           AA|      BB|      CC|
+----------------+---------------------------------+-------------+--------+--------+
|               1|[AA 1234  ZXYV,BB A 890,CC B 321]|AA 1234  ZXYV|BB A 890|CC B 321|
|               2|[AA 1234  LMNO,BB D 123,CC E 321]|AA 1234  LMNO|BB D 123|CC E 321|
|               3|[AA 1234  ZXYV,CC B 321]         |AA 1234  ZXYV|    Null|CC B 321|
+----------------+---------------------------------+-------------+--------+--------+

因为在那个时候提取我所需的信息应该是微不足道的。你们有什么建议,我该怎么做呢?非常感谢。
3个回答

7

不需要将数组转换为RDD即可进行分裂的替代方法:

from pyspark.sql import functions as F

udf1 = F.udf(lambda x : x.split()[0])
df.select('index',F.explode('result').alias('id'),udf1(F.col('id')).alias('idtype')).show()

+-----+-------------+------+
|index|           id|idtype|
+-----+-------------+------+
|    1|AA 1234  ZXYV|    AA|
|    1|     BB A 890|    BB|
|    1|     CC B 321|    CC|
|    2|AA 1234  LMNO|    AA|
|    2|     BB D 123|    BB|
|    2|     CC E 321|    CC|
|    3|AA 1234  ZXYV|    AA|
|    3|     CC B 321|    CC|
+-----+-------------+------+ 

df1.groupby('index').pivot('idtype').agg(F.first('id')).join(df,'index').show()

2
您可以使用flatMappivot来实现此目的。从第一阶段的结果开始:
rdd = sc.parallelize([(1,['AA 1234  ZXYV','BB A 890','CC B 321']),
                      (2,['AA 1234  LMNO','BB D 123','CC E 321']),
                      (3,['AA 1234  ZXYV','CC B 321'])])

df = rdd.toDF(['index', 'result'])

你可以使用flatMap将数组拆分为多行,并将两个字母的标识符提取到单独的列中。
df_flattened = df.rdd.flatMap(lambda x: [(x[0],y, y[0:2],y[3::]) for y in x[1]])\
               .toDF(['index','result', 'identifier','identifiertype'])

使用 pivot 将两个字母的标识符更改为列名:

df_result = df_flattened.groupby(df_flattened.index,)\
                        .pivot("identifier")\
                        .agg(first("identifiertype"))\
                        .join(df,'index')

我添加了连接以获取result列的值。


那个完美地运作了,正是我所需要的。非常感谢你的帮助。 - robarthur1

0

假设您正在使用Spark 2.x,我认为您要寻找的是Spark DataFrame上的透视操作。

首先,您可以创建一个仅包含两列的表,即两个字母编码和另一列中的其余内容。然后,您可以在DataFrame上使用透视来执行此操作,如下所示。

df.pivot("encoding_col",Seq("AA","BB"))

你可以在这里找到一些关于数据框透视的好例子 这里


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接