如何通过一个数组列来压缩pySpark数据框?

3

我有一个类似这样的Spark DataFrame:

+------+--------+--------------+--------------------+
|   dbn|    boro|total_students|                sBus|
+------+--------+--------------+--------------------+
|17K548|Brooklyn|           399|[B41, B43, B44-SB...|
|09X543|   Bronx|           378|[Bx13, Bx15, Bx17...|
|09X327|   Bronx|           543|[Bx1, Bx11, Bx13,...|
+------+--------+--------------+--------------------+

我该如何使每一行复制sBus中的每个元素,并且将sBus变成普通字符串列?
结果应该像这样:
+------+--------+--------------+--------------------+
|   dbn|    boro|total_students|                sBus|
+------+--------+--------------+--------------------+
|17K548|Brooklyn|           399| B41                |
|17K548|Brooklyn|           399| B43                |
|17K548|Brooklyn|           399| B44-SB             |
+------+--------+--------------+--------------------+

and so on...


1
你能提供期望的输出吗?你期望得到sBussSw之间的笛卡尔积作为结果吗? - zero323
感谢!已添加预期结果。为简单起见,删除了sSw列。 - Philipp_Kats
1
你可以使用 explode 函数(例如请参考 http://stackoverflow.com/q/36484385/1560062),但如果你有多个列,那么这并不是那么简单的。 - zero323
1个回答

2

我想不出一种方法来做到这一点,而不将其转换为RDD。

# convert df to rdd
rdd = df.rdd

def extract(row, key):
    """Takes dictionary and key, returns tuple of (dict w/o key, dict[key])."""
    _dict = row.asDict()
    _list = _dict[key]
    del _dict[key]
    return (_dict, _list)


def add_to_dict(_dict, key, value):
    _dict[key] = value
    return _dict


# preserve rest of values in key, put list to flatten in value
rdd = rdd.map(lambda x: extract(x, 'sBus'))
# make a row for each item in value
rdd = rdd.flatMapValues(lambda x: x)
# add flattened value back into dictionary
rdd = rdd.map(lambda x: add_to_dict(x[0], 'sBus', x[1]))
# convert back to dataframe
df = sqlContext.createDataFrame(rdd)

df.show()

难点在于如何将其他列与新压平的值一起保持。我通过将每行映射为一个(其他列的字典,要压平的列表)元组,并调用flatMapValues来实现。这将把值列表的每个元素拆分成单独的行,但保持键的附加,即:

(key, ['A', 'B', 'C'])

变成

(key, 'A')
(key, 'B')
(key, 'C')

然后,我将压平的值移回到其他列的字典中,并将其重新转换为DataFrame。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接