PySpark DataFrame:根据另一列的最小/最大条件更改单元格值

4

I have the following pySpark dataframe:

+------------------+------------------+--------------------+--------------+-------+
|              col1|              col2|                col3|             X|      Y|
+------------------+------------------+--------------------+--------------+-------+
|2.1729247374294496| 3.558069532647046|   6.607603368496324|             1|   null|
|0.2654841575294071|1.2633077949463256|0.023578679968183733|             0|   null|
|0.4253301781296708|3.4566490739823483| 0.11711202266039554|             3|   null|
| 2.608497168338446| 3.529397129549324|   0.373034222141551|             2|   null|
+------------------+------------------+--------------------+--------------+-------+

这是一个相当简单的操作,我可以轻松使用pandas完成。但是,我需要仅使用pySpark来完成它。
我想做以下操作(我会写成伪代码):
在col3 == max(col3)的行中,将Y从null更改为'K'
在其余的行中,在col1 == max(col1)的行中,将Y从null更改为'Z'
在其余的行中,在col1 == min(col1)的行中,将Y从null更改为'U'
在其余的行中:将Y从null更改为'I'。
因此,期望的输出是:
+------------------+------------------+--------------------+--------------+-------+
|              col1|              col2|                col3|             X|      Y|
+------------------+------------------+--------------------+--------------+-------+
|2.1729247374294496| 3.558069532647046|   6.607603368496324|             1|      K|
|0.2654841575294071|1.2633077949463256|0.023578679968183733|             0|      U|
|0.4253301781296708|3.4566490739823483| 0.11711202266039554|             3|      I|
| 2.608497168338446| 3.529397129549324|   0.373034222141551|             2|      Z|
+------------------+------------------+--------------------+--------------+-------+

完成上述操作后,我需要将此表作为另一个表的查找表使用:
+--------------------+--------+-----+------------------+--------------+------------+
|                  x1|      x2|   x3|                x4|             X|           d|
+--------------------+--------+-----+------------------+--------------+------------+
|0057f68a-6330-42a...|    2876|   30| 5.989999771118164|             0|    20171219|
|05cc0191-4ee4-412...|  108381|   34|24.979999542236328|             3|    20171219|
|06f353af-e9d3-4d0...|  118798|   34|               0.0|             3|    20171219|
|0c69b607-112b-4f3...|   20993|   34|               0.0|             0|    20171219|
|0d1b52ba-1502-4ff...|   23817|   34|               0.0|             0|    20171219|

我想使用第一个表作为查找表,在第二个表中创建一个新列。 新列的值应该使用第二个表中的X列作为键,查找第一个表中的Y列中对应于第一个表中的X列的值(因此我们查找第一个表中与第二个表中的X列对应的Y列中的值,并且这些值来自第二个表中的X列)。
更新:我需要一个能够处理一行满足两个条件的鲁棒解决方案,例如:
+------------------+------------------+--------------------+--------------+-------+
|              col1|              col2|                col3|             X|      Y|
+------------------+------------------+--------------------+--------------+-------+
| 2.608497168338446| 3.558069532647046|   6.607603368496324|             1|   null|
|0.2654841575294071|1.2633077949463256|0.023578679968183733|             0|   null|
|0.4253301781296708|3.4566490739823483| 0.11711202266039554|             3|   null|
|2.1729247374294496| 3.529397129549324|   0.373034222141551|             2|   null|
+------------------+------------------+--------------------+--------------+-------+

在这种情况下,第0行同时满足'max('col3')'和'max('col1')'条件。
因此需要执行以下操作:
第0行变成'K'
第3行变成'Z'(因为在剩余的行中(0已经有了'K',所以第3行符合'max('col1')'条件)
第1行变成'U'
第2行变成'I'
在表格1中不能有多个包含'I'的行。
1个回答

7

计算聚合值:

from pyspark.sql import functions as F

df = spark.createDataFrame([
    (2.1729247374294496,  3.558069532647046,    6.607603368496324, 1),
    (0.2654841575294071, 1.2633077949463256, 0.023578679968183733, 0),
    (0.4253301781296708, 3.4566490739823483,  0.11711202266039554, 3),
    (2.608497168338446,  3.529397129549324,    0.373034222141551, 2)
], ("col1", "col2", "col3", "x"))

min1, max1, max3 = df.select(F.min("col1"), F.max("col1"), F.max("col3")).first()

在使用when时添加列:
y = (F.when(F.col("col3") == max3, "K")  # In row where col3 == max(col3), change Y from null to 'K'
    .when(F.col("col1") == max1, "Z")    # In the remaining rows, in the row where col1 == max(col1), change Y from null to 'Z'
    .when(F.col("col1") == min1, "U")    # In the remaining rows, in the row where col1 == min(col1), change Y from null to 'U'
    .otherwise("I"))                     # In the remaining row: change Y from null to 'I'

df_with_y = df.withColumn("y", y)


df_with_y.show()
# +------------------+------------------+--------------------+---+---+
# |              col1|              col2|                col3|  x|  y|
# +------------------+------------------+--------------------+---+---+
# |2.1729247374294496| 3.558069532647046|   6.607603368496324|  1|  K|
# |0.2654841575294071|1.2633077949463256|0.023578679968183733|  0|  U|
# |0.4253301781296708|3.4566490739823483| 0.11711202266039554|  3|  I|
# | 2.608497168338446| 3.529397129549324|   0.373034222141551|  2|  Z|
# +------------------+------------------+--------------------+---+---+

新列的值应该在第一张表中使用第二张表中的X列作为键,在Y列中查找。
df_with_y.select("x", "Y").join(df2, ["x"])

如果y已经存在,并且你想要保留非空值:
df_ = spark.createDataFrame([
    (2.1729247374294496,  3.558069532647046,    6.607603368496324, 1, "G"),
    (0.2654841575294071, 1.2633077949463256, 0.023578679968183733, 0, None),
    (0.4253301781296708, 3.4566490739823483,  0.11711202266039554, 3, None),
    (2.608497168338446,  3.529397129549324,    0.373034222141551, 2, None)
], ("col1", "col2", "col3", "x", "y"))

min1_, max1_, max3_ = df.filter(F.col("y").isNull()).select(F.min("col1"), F.max("col1"), F.max("col3")).first()

y_ = (F.when(F.col("col3") == max3_, "K") 
    .when(F.col("col1") == max1_, "Z")
    .when(F.col("col1") == min1_, "U") 
    .otherwise("I"))

df_.withColumn("y", F.coalesce(F.col("y"), y_)).show()


# +------------------+------------------+--------------------+---+---+
# |              col1|              col2|                col3|  x|  y|
# +------------------+------------------+--------------------+---+---+
# |2.1729247374294496| 3.558069532647046|   6.607603368496324|  1|  G|
# |0.2654841575294071|1.2633077949463256|0.023578679968183733|  0|  U|
# |0.4253301781296708|3.4566490739823483| 0.11711202266039554|  3|  I|
# | 2.608497168338446| 3.529397129549324|   0.373034222141551|  2|  K|
# +------------------+------------------+--------------------+---+---+

如果你遇到了数字精度问题,可以尝试以下方法:

threshold = 0.0000001 # Choose appropriate 

y_t = (F.when(F.abs(F.col("col3") - max3) < threshold, "K")  # In row where col3 == max(col3), change Y from null to 'K'
    .when(F.abs(F.col("col1") - max1) < threshold, "Z")    # In the remaining rows, in the row where col1 == max(col1), change Y from null to 'Z'
    .when(F.abs(F.col("col1") - min1) < threshold, "U")    # In the remaining rows, in the row where col1 == min(col1), change Y from null to 'U'
    .otherwise("I"))                     # In the remaining row: change Y from null to 'I'

df.withColumn("y", y_t).show()
# +------------------+------------------+--------------------+---+---+
# |              col1|              col2|                col3|  x|  y|
# +------------------+------------------+--------------------+---+---+
# |2.1729247374294496| 3.558069532647046|   6.607603368496324|  1|  K|
# |0.2654841575294071|1.2633077949463256|0.023578679968183733|  0|  U|
# |0.4253301781296708|3.4566490739823483| 0.11711202266039554|  3|  I|
# | 2.608497168338446| 3.529397129549324|   0.373034222141551|  2|  Z|
# +------------------+------------------+--------------------+---+---+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接