如何在Pyspark中查找数组列的多个模式

6
我想在这个数据框中找到任务列的模式:
+-----+-----------------------------------------+
|  id |              task                       |
+-----+-----------------------------------------+
| 101 |   [person1, person1, person3]           |
| 102 |   [person1, person2, person3]           |
| 103 |           null                          |
| 104 |   [person1, person2]                    |
| 105 |   [person1, person1, person2, person2]  |
| 106 |           null                          |
+-----+-----------------------------------------+

如果有多个模式,我想显示所有的模式。
有人可以帮我获取这个输出吗?
+-----+-----------------------------------------+---------------------------+
|  id |              task                       |           mode            |
+-----+-----------------------------------------+---------------------------+
| 101 |   [person1, person1, person3]           |[person1]                  |
| 102 |   [person1, person2, person3]           |[person1, person2, person3]|
| 103 |           null                          |[]                         |
| 104 |   [person1, person2]                    |[person1, person2]         |
| 105 |   [person1, person1, person2, person2]  |[person1, person2]         |
| 106 |           null                          |[]                         |
+-----+-----------------------------------------+---------------------------+

这是我的第一个问题。非常感谢您的任何帮助和提示。谢谢。

如果使用spark >= 2.4.0,您可以使用内置的array_intersect函数,例如:df.withColumn('intersection', array_intersect(df['task'], df['mode'])) - abiratsis
3个回答

3

我认为在spark2.4+中,我们不需要使用UDF来实现这个功能,因为我们可以使用higher order functions来获得所需的输出。与使用计数器的UDF相比,使用higher order functions在处理大数据时速度会更快。

from pyspark.sql import functions as F

df\
  .withColumn("most_common", F.expr("""transform(array_distinct(values),\
                                      x-> array(aggregate(values, 0,(acc,t)->acc+IF(t=x,1,0)),x))"""))\
  .withColumn("most_common", F.expr("""transform(filter(most_common, x-> x[0]==array_max(most_common)[0]),y-> y[1])"""))\
  .show(truncate=False)

#+---+----------------------------------------+---------------------------+
#|id |values                                  |most_common                |
#+---+----------------------------------------+---------------------------+
#|1  |[good, good, good, bad, bad, good, good]|[good]                     |
#|2  |[bad, badd, good, bad,, good, bad, good]|[bad, good]                |
#|2  |[person1, person2, person3]             |[person1, person2, person3]|
#|2  |null                                    |null                       |
#+---+----------------------------------------+---------------------------+

0

使用 Spark 2.3:

您可以使用自定义的 UDF 来解决这个问题。为了获取多个众数值,我使用了一个 Counter。在 UDF 中,我使用 except 块来处理您的 task 列中的 null 值情况。
(对于 Python 3.8+ 用户,有一个内置函数 statistics.multimode() 可供使用)

您的数据框架:

from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import *

schema = StructType([StructField("id", IntegerType()), StructField("task", ArrayType(StringType()))])
data = [[101, ["person1", "person1", "person3"]], [102, ["person1", "person2", "person3"]], [103, None], [104, ["person1", "person2"]], [105, ["person1", "person1", "person2", "person2"]], [106, None]]

df = spark.createDataFrame(data,schema=schema)

操作:

from collections import Counter

def get_multi_mode_list(input_array):
    multi_mode = []
    counter_var = Counter(input_array)  
    try:
        temp = counter_var.most_common(1)[0][1]
    except:
        temp = counter_var.most_common(1)
    for i in counter_var: 
        if input_array.count(i) == temp: 
            multi_mode.append(i)
    return(list(set(multi_mode)))


get_multi_mode_list_udf = F.udf(get_multi_mode_list, ArrayType(StringType()))

df.withColumn("multi_mode", get_multi_mode_list_udf(col("task"))).show(truncate=False)

输出:

+---+------------------------------------+---------------------------+
|id |task                                |multi_mode                 |
+---+------------------------------------+---------------------------+
|101|[person1, person1, person3]         |[person1]                  |
|102|[person1, person2, person3]         |[person2, person3, person1]|
|103|null                                |[]                         |
|104|[person1, person2]                  |[person2, person1]         |
|105|[person1, person1, person2, person2]|[person2, person1]         |
|106|null                                |[]                         |
+---+------------------------------------+---------------------------+

1
非常感谢您的帮助!这正是我所寻找的。 - user11526014

0
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from collections import Counter
from itertools import groupby
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql import *
from collections import *
from pyspark.sql.functions import udf,explode
from pyspark.sql.types import StringType
table_schema = StructType([
                     StructField('key2', IntegerType(), True),
                 
                     StructField('list6', ArrayType(StringType()), False)
                     ])
df= spark.createDataFrame([
( 101 ,   ["person1", "person1", "person3"] ),
(102 ,   ["person1", "person2", "person3"]   ),
( 103 ,None  ),
( 104  ,  ["person1", "person2"]),
(105 ,   ["person1", "person1", "person2", "person2"])] 
,["id","List"])
def mode(list1):
    res = []
    if(list1 is None or len(list1)==0):
        return []
    test_list1 = Counter(list1)  
    temp = test_list1.most_common(1)[0][1]
    for ele in list1:
       if list1.count(ele) == temp:
           res.append(ele)
    return list(set(res))




df.createOrReplaceTempView("A")
spark.udf.register("mode", mode,ArrayType(StringType()))
spark.sql("select id,list,mode(list)func from A").show(truncate=False)

谢谢你的回答。但是当我在我的数据框上应用它时,我遇到了“IndexError: list index out of range”错误。对于id=103,您有一个空列表,但我得到了null。我认为这就是错误所在。 - user11526014
@Whimsy 我已经编辑了代码,现在不会抛出异常了。 - Addy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接