我有一个 PySpark DataFrame,名为 df1,它的形式如下:
CustomerID CustomerValue
12 .17
14 .15
14 .25
17 .50
17 .01
17 .35
我有一个第二个 PySpark DataFrame,名为 df2,它是通过按 CustomerID 进行分组并使用 sum 函数进行聚合的 df1。它看起来像这样:
我有一个第二个 PySpark DataFrame,df2,它是通过将 df1 按 CustomerID 分组并使用 sum 函数进行聚合得到的。它长这样:
CustomerID CustomerValueSum
12 .17
14 .40
17 .86
我想在df1中添加第三列,该列为df1['CustomerValue']除以df2['CustomerValueSum']相同的CustomerIDs。这将看起来像这样:
CustomerID CustomerValue NormalizedCustomerValue
12 .17 1.00
14 .15 .38
14 .25 .62
17 .50 .58
17 .01 .01
17 .35 .41
换句话说,我正在尝试将这段 Python/Pandas 代码转换为 PySpark:
normalized_list = []
for idx, row in df1.iterrows():
(
normalized_list
.append(
row.CustomerValue / df2[df2.CustomerID == row.CustomerID].CustomerValueSum
)
)
df1['NormalizedCustomerValue'] = [val.values[0] for val in normalized_list]
我该怎么做?