最终我想要的是DataFrame中所有列的众数。对于其他的汇总统计数据,我看到有几个选项:使用DataFrame聚合,或者将DataFrame的列映射为向量的RDD(这也是我遇到困难的地方),然后使用MLlib中的
colStats
方法。但是我在那里没有看到众数作为一个选项。colStats
方法。但是我在那里没有看到众数作为一个选项。众数存在和中位数类似的问题。虽然计算很容易,但代价却相当昂贵。可以使用排序后的本地和全局聚合或者只使用另一个单词计数和过滤来完成计算:
import numpy as np
np.random.seed(1)
df = sc.parallelize([
(int(x), ) for x in np.random.randint(50, size=10000)
]).toDF(["x"])
cnts = df.groupBy("x").count()
mode = cnts.join(
cnts.agg(max("count").alias("max_")), col("count") == col("max_")
).limit(1).select("x")
mode.first()[0]
## 0
无论哪种方式,可能需要对每一列进行完全洗牌。
这行代码将返回spark数据框df中“col”的众数:
df.groupby("col").count().orderBy("count", ascending=False).first()[0]
要获取df中所有列的模式列表,请使用以下命令:
[df.groupby(i).count().orderBy("count", ascending=False).first()[0] for i in df.columns]
为每一列添加名称以标识其所属模式,可以创建一个二维列表:
[[i,df.groupby(i).count().orderBy("count", ascending=False).first()[0]] for i in df.columns]
Spark 3.4+具有模式
:
F.mode(column)
完整例子:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[(2, 7),
(1, 8),
(1, 9),
(1, None),
(2, None)],
['c1', 'c2'])
df.agg(*[F.mode(c).alias(c) for c in df.columns]).show()
# +---+---+
# | c1| c2|
# +---+---+
# | 1| 7|
# +---+---+
from pyspark.sql.functions import monotonically_increasing_id
def get_mode(df):
column_lst = df.columns
res = [df.select(i).groupby(i).count().orderBy("count", ascending=False) for i in column_lst]
df_mode = res[0].limit(1).select(column_lst[0]).withColumn("temp_name_monotonically_increasing_id", monotonically_increasing_id())
for i in range(1, len(res)):
df2 = res[i].limit(1).select(column_lst[i]).withColumn("temp_name_monotonically_increasing_id", monotonically_increasing_id())
df_mode = df_mode.join(df2, (df_mode.temp_name_monotonically_increasing_id == df2.temp_name_monotonically_increasing_id)).drop(df2.temp_name_monotonically_increasing_id)
return df_mode.drop("temp_name_monotonically_increasing_id")
它适用于分类和数值数据类型。
from pyspark.sql.functions import col, udf, collect_list
import statistics
# define a UDF to calculate mode
def mode_udf(data):
if len(data) == 0:
return None
return statistics.mode(data) # similar for mean, median.
# register the UDF
mode_func = udf(mode_udf)
# create a sample dataframe
data = [("apple", 1), ("orange", 2), ("apple", 2), ("banana", 4), ("orange", 12), ("orange", 2), ("apple", 3), ("apple", 0), ("apple", 3),("apple", 2), ("apple", 2), ("banana", 7), ("banana", 4)]
df = spark.createDataFrame(data, ["fruit", "quantity"])
# calculate the mode for the "fruit" column
mode_df = df.groupBy("fruit").agg(mode_func(collect_list("quantity")).alias("quantity_mode"))
# show the result
mode_df.show()
注意:请处理数据中的空值/无效值,否则可能会得到意外的输出结果。
case MODE:
Dataset<Row> cnts = ds.groupBy(column).count();
Dataset<Row> dsMode = cnts.join(
cnts.agg(functions.max("count").alias("max_")),
functions.col("count").equalTo(functions.col("max_")
));
Dataset<Row> mode = dsMode.limit(1).select(column);
replaceValue = ((GenericRowWithSchema) mode.first()).values()[0];
ds = replaceWithValue(ds, column, replaceValue);
break;
private static Dataset<Row> replaceWithValue(Dataset<Row> ds, String column, Object replaceValue) {
return ds.withColumn(column,
functions.coalesce(functions.col(column), functions.lit(replaceValue)));
}
>>> df=newdata.groupBy('columnName').count()
>>> mode = df.orderBy(df['count'].desc()).collect()[0][0]
See My result
>>> newdata.groupBy('var210').count().show()
+------+-----+
|var210|count|
+------+-----+
| 3av_| 64|
| 7A3j| 509|
| g5HH| 1489|
| oT7d| 109|
| DM_V| 149|
| uKAI|44883|
+------+-----+
# store the above result in df
>>> df=newdata.groupBy('var210').count()
>>> df.orderBy(df['count'].desc()).collect()
[Row(var210='uKAI', count=44883),
Row(var210='g5HH', count=1489),
Row(var210='7A3j', count=509),
Row(var210='DM_V', count=149),
Row(var210='oT7d', count=109),
Row(var210='3av_', count=64)]
# get the first value using collect()
>>> mode = df.orderBy(df['count'].desc()).collect()[0][0]
>>> mode
'uKAI'
首先按列分组计数(不计算空值),并获取最大计数值(即频繁值)。 其次,查找最大计数值的键:
from pysprak.sql import functions as F
count_mode_val = df.groupBy("column_name").count().filter(F.col("column_name").isNotNull()).agg(F.max("count")).collect()[0][0]
mode_val = df.groupBy("column_name").count().filter(F.col("column_name").isNotNull()).filter(F.col("count") == count_mode_val).select("column_name").collect()[0][0]
builtins.max
而不是pyspark.sql.functions.max
。 - zero323na.drop("column_name")
。 - zero323