如何根据字典对列值进行排名并保留最高的值?

4
假设我有一个如下的数据框:
| id |col
| 1  | "A,B,C"
| 2  | "D,C"
| 3  | "B,C,A"
| 4  | None

而字典是:

d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}

输出的数据框必须是:
| id |col
| 1  | "A"
| 2  | "C"
| 3  | "A"
| 4  | None
3个回答

1
这里是另一种解决方案,使用 @Nithish 的结构排序方法,但是使用 arrays_ziparray_min
  1. 从字典中创建权重数组(按键排序)
  2. 将权重数组与排序后的拆分结果进行压缩
  3. 获取压缩后的结构体数组的最小值
import pyspark.sql.functions as F

df = spark.createDataFrame([(1, "A,B,C"), (2, "D,C"), (3, "B,C,A"), (4, None)], ["id", "col"])
d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}

result = df.withColumn(
    "col",
    F.array_min(
        F.arrays_zip(
            F.array(*[F.lit(d[x]) for x in sorted(d)]), 
            F.array_sort(F.split("col", ","))
        )
    )["1"]
)

result.show()
#+---+----+
#| id| col|
#+---+----+
#|  1|   A|
#|  2|   C|
#|  3|   A|
#|  4|null|
#+---+----+

1
Higher Order Functions - Transform”可以用来根据字典关联排列“col”中的元素,并进行排序以获取排名最低的元素。
from pyspark.sql import functions as F
from itertools import chain

data = [(1, "A,B,C",),
        (2, "D,C",),
        (3, "B,C,A",),
        (4, None,), ]
df = spark.createDataFrame(data, ("id", "col", ))

d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}

mapper = F.create_map([F.lit(c) for c in chain.from_iterable(d.items())])

"""
Mapper has the value Column<'map(A, 1, B, 2, C, 3, D, 4)'>
"""

(df.withColumn("col", F.split(F.col("col"), ",")) # Split string to create an array
  .withColumn("mapper", mapper) # Add mapping columing to the dataframe
  .withColumn("col", F.expr("transform(col, x -> struct(mapper[x] as rank, x as col))")) # Iterate over array and look up rank from mapper
  .withColumn("col", F.array_min(F.col("col")).col) # array_min find minimum value based on the first struct field
).select("id", "col").show()

"""
+---+----+
| id| col|
+---+----+
|  1|   A|
|  2|   C|
|  3|   A|
|  4|null|
+---+----+
"""

1
你可以使用 array_min 函数来代替排序并获取数组的第一个元素。 - blackbishop

0

我猜你想根据字典 d 中给定的值对字母进行排序。

那么,你可以这样做:

from pyspark.sql import Row
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.master("local").appName("sort_column_test").getOrCreate()

df = spark.createDataFrame(data=(Row(1, "A,B,C",),
                                 Row(2, "D,C",),
                                 Row(3, "B,C,A",),
                                 Row(4, None)),
                           schema="id:int, col:string")
d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}

# Define a sort UDF that sorts the array according to the dictionary 'd', also handles None arrays
sort_udf = F.udf(lambda array: sorted(array,
                                      key=lambda x: d[x]) if array is not None else None,
                 T.ArrayType(T.StringType()))
df = df.withColumn("col", sort_udf(F.split(F.col("col"), ",")).getItem(0))
df.show()

"""
+---+----+
| id| col|
+---+----+
|  1|   A|
|  2|   C|
|  3|   A|
|  4|null|
+---+----+
"""






网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接