PySpark中的KMeans聚类

17

我有一个名为'mydataframe'的Spark数据帧,其中包含许多列。我正在尝试仅对两列进行kmeans聚类:纬度和经度(使用它们作为简单值)。我想基于这两个列提取7个簇,然后将集群分配附加到我的原始数据框中。我尝试过:

from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel

# Prepare a data frame with just 2 columns:
data = mydataframe.select('lat', 'long')
data_rdd = data.rdd  # needs to be an RDD
data_rdd.cache()

# Build the model (cluster the data)
clusters = KMeans.train(data_rdd, 7, maxIterations=15, initializationMode="random")

但是一段时间后我收到一个错误信息:

org.apache.spark.SparkException: 由于阶段失败而中止作业:阶段5191.0中的任务1失败了4次,最近失败:在阶段5191.0中丢失了任务1.3(TID 260738,10.19.211.69,执行程序1):org.apache.spark.api.python.PythonException:Traceback(最近调用的最后一次)

我已经尝试过分离和重新连接集群。同样的结果。我做错了什么吗?


1
在地理数据上,使用Haversine距离,不要使用kmeans。 - Has QUIT--Anony-Mousse
@Anony-Mousse - 哇,谢谢!您会推荐更适合纬度和经度聚类的方法吗? - user3245256
Haversine距离和OPTICS聚类。 - Has QUIT--Anony-Mousse
2个回答

79

由于根据你之前的另一个问题,我猜测你在Spark集群方面还处于非常初级的阶段(你甚至导入了sqrtarray,但从未使用过它们,可能是因为在文档示例中就是这样),所以让我提供一些更普遍的建议,而不是针对你在这里提出的具体问题(希望也能帮助你避免随后打开3-4个问题,试图将你的集群分配回到你的数据框中)...

既然

  1. 你已经有了数据框

  2. 你想将集群成员身份附加回到你的初始数据框中

你没有理由回退到RDD并使用(即将弃用)MLlib包;你将使用(现在推荐的)ML包直接与数据框一起更轻松、优雅和高效地完成任务。

步骤0 - 制作一些类似于你的玩具数据:

spark.version
# u'2.2.0'

df = spark.createDataFrame([[0, 33.3, -17.5],
                              [1, 40.4, -20.5],
                              [2, 28., -23.9],
                              [3, 29.5, -19.0],
                              [4, 32.8, -18.84]
                             ],
                              ["other","lat", "long"])

df.show()
# +-----+----+------+
# |other| lat|  long|
# +-----+----+------+
# |    0|33.3| -17.5|
# |    1|40.4| -20.5| 
# |    2|28.0| -23.9|
# |    3|29.5| -19.0|
# |    4|32.8|-18.84|
# +-----+----+------+

步骤1 - 收集你的特征

与大多数机器学习包不同,Spark ML 要求你的输入特征被收集在数据帧的一个 单独列中,通常命名为 features;而且它提供了一个特定的方法来实现这一点,即 VectorAssembler

from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=["lat", "long"], outputCol="features")
new_df = vecAssembler.transform(df)
new_df.show()
# +-----+----+------+-------------+ 
# |other| lat|  long|     features|
# +-----+----+------+-------------+
# |    0|33.3| -17.5| [33.3,-17.5]|
# |    1|40.4| -20.5| [40.4,-20.5]|
# |    2|28.0| -23.9| [28.0,-23.9]| 
# |    3|29.5| -19.0| [29.5,-19.0]|
# |    4|32.8|-18.84|[32.8,-18.84]|
# +-----+----+------+-------------+ 

也许你已经猜到了,参数inputCols是用来告诉VectoeAssembler我们数据集中哪些列要被用作特征。

步骤2 - 拟合你的KMeans模型

from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=2, seed=1)  # 2 clusters here
model = kmeans.fit(new_df.select('features'))

select('features')在这里用于告诉算法要使用数据框的哪一列进行聚类 - 请记住,在上面的步骤1之后,您原始的latlong特征不再直接使用。

步骤3 - 转换您的初始数据框以包括群集分配。

transformed = model.transform(new_df)
transformed.show()    
# +-----+----+------+-------------+----------+ 
# |other| lat|  long|     features|prediction|
# +-----+----+------+-------------+----------+
# |    0|33.3| -17.5| [33.3,-17.5]|         0| 
# |    1|40.4| -20.5| [40.4,-20.5]|         1|
# |    2|28.0| -23.9| [28.0,-23.9]|         0|
# |    3|29.5| -19.0| [29.5,-19.0]|         0|
# |    4|32.8|-18.84|[32.8,-18.84]|         0|
# +-----+----+------+-------------+----------+

transformed数据框的最后一列prediction显示了聚类分配情况 - 在我的玩具案例中,我得到了4条记录在聚类#0中,1条记录在聚类#1中。

您可以使用select语句进一步操作transformed数据框,甚至可以dropfeatures列(该列已经完成了其功能,可能不再需要)...

希望现在您更接近于实现您最初想要达到的目标。对于提取聚类统计信息等,我最近的另一个答案可能会有所帮助...


2
亲爱的desertnaut,非常感谢您抽出时间编写了我所读过的最好的stackoverflow答案。我一定会将其作为一个优秀的参考资料。是的,你猜对了 - 我会问更多的问题! :) 我不知道我正在使用一些旧的、已弃用的库,我很高兴你向我展示了“正确的道路”。我在你的优秀解释中理解了一切。还有一个小问题(与Spark相关而不是kMeans相关):即使df非常大,从存储和内存的角度来看,制作越来越多的新数据框(df,然后df_new)是否可以? - user3245256
@user3245256 标准做法是在进行数据转换时逐步将其分配到新的数据框中。无论如何,可以进行实验并查看结果... - desertnaut
正如@desertnaut所提到的,将数据转换为RDD进行机器学习操作是非常低效的。话虽如此,遗憾的是,即使在pyspark.ml.clustering库中的KMeans方法中,获取模型输出时仍然使用了collect函数。这使得在对大量数据应用Kmeans时,Spark的能力变得无用,所有的工作节点都将处于空闲状态,只有您的驱动节点会加班工作。 - thentangler
运行这段完全相同的代码,在你的示例中给出的分类结果是0011,而不是0100。也许在新版本中,算法的一些默认值已经改变了?(我使用的是pyspark 3.4.1) - undefined

6

尽管我有其他通用答案,但如果出于任何原因您必须坚持使用MLlib和RDD,请看看下面是什么原因导致您在使用相同的示例df时出现错误。

当您从数据框中选择要转换为RDD的列时,结果是一个的RDD:

df.select('lat', 'long').rdd.collect()
# [Row(lat=33.3, long=-17.5), Row(lat=40.4, long=-20.5), Row(lat=28.0, long=-23.9), Row(lat=29.5, long=-19.0), Row(lat=32.8, long=-18.84)]

这不适合作为MLlib KMeans的输入。你需要进行map操作才能使其正常工作:

df.select('lat', 'long').rdd.map(lambda x: (x[0], x[1])).collect()
# [(33.3, -17.5), (40.4, -20.5), (28.0, -23.9), (29.5, -19.0), (32.8, -18.84)]

所以,你的代码应该像这样:

So, 你的代码应该像这样:

from pyspark.mllib.clustering import KMeans, KMeansModel

rdd = df.select('lat', 'long').rdd.map(lambda x: (x[0], x[1]))
clusters = KMeans.train(rdd, 2, maxIterations=10, initializationMode="random") # works OK
clusters.centers
# [array([ 40.4, -20.5]), array([ 30.9 , -19.81])]

很好的添加。有一点,collect()返回列表,您也可以将数据框发送到kmeans训练模型。 - Yamur
我们只在最终结果时使用 collect;如果我们可以在这里使用它,那么使用Spark就没有任何意义了 - 我们最好使用scikit-learn或类似的工具... - desertnaut

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接