如何使用Spark查找10亿条记录中的最近邻?

10

给出包含以下信息的10亿条记录:

    ID  x1  x2  x3  ... x100
    1   0.1  0.12  1.3  ... -2.00
    2   -1   1.2    2   ... 3
    ...
对于上述每个ID,我想找到其最接近的前10个ID,基于它们的向量(x1, x2, ..., x100)的欧几里得距离。如何计算最佳方式?

2
你尝试了什么?我们要求您展示一下您迄今为止所尝试的内容,当您遇到困难或无法理解错误并且文档无法帮助时,我们会在这里提供帮助。此外,重要的是要包含易于其他用户复制和粘贴到自己环境中的示例数据,以便他们可以在自己的环境中跟随操作。 - Katya Willard
4个回答

9

8
执行对所有记录进行暴力比较是一场失败的战斗。我的建议是选择现成的k-最近邻算法实现,例如由scikit-learn提供的算法,然后广播索引和距离的结果并进一步处理。
在这种情况下,步骤如下:
1-像Bryce建议的那样将特征向量化,并让您的向量化方法返回一个浮点数列表(或numpy数组),其中元素数量与特征数量相同
2-将scikit-learn nn拟合到您的数据中:
nbrs = NearestNeighbors(n_neighbors=10, algorithm='auto').fit(vectorized_data)

3- 在您的向量化数据上运行训练好的算法(在您的情况下,训练和查询数据是相同的)。

distances, indices = nbrs.kneighbors(qpa)

步骤2和3将在您的pyspark节点上运行,在这种情况下不能并行化。您需要在此节点上拥有足够的内存。在我的情况下,处理150万条记录和4个特征需要一两秒钟时间。
在我们获得适用于spark的良好NN实现之前,我想我们必须坚持使用这些解决方法。如果您更愿意尝试新的东西,请尝试http://spark-packages.org/package/saurfang/spark-knn

4
实际上,你回答中的步骤3是可以并行化的:使用Sklearn的k-NN kneighbors()方法可以与Spark一起使用!我已经在这里发布了如何操作的信息:https://adventuresindatascience.wordpress.com/2016/04/02/integrating-spark-with-scikit-learn-visualizing-eigenvectors-and-fun/ - xenocyon
谢谢你提醒我! - architectonic

1

你没有提供太多细节,但我处理这个问题的一般方法是:

  1. 将记录转换为类似于LabeledPoint的数据结构,其中(ID,x1..x100)作为标签和特征
  2. 映射每个记录并将该记录与所有其他记录进行比较(这里有很大的优化空间)
  3. 创建一些截止逻辑,以便一旦开始将ID = 5与ID = 1进行比较,您就会中断计算,因为您已经将ID = 1与ID = 5进行了比较
  4. 一些减少步骤以获得类似于{id_pair: [1,5], distance: 123}的数据结构
  5. 另一个映射步骤以查找每个记录的10个最近邻居

你已经确定了pyspark,我通常使用scala来完成这种类型的工作,但每个步骤的伪代码可能如下:

# 1. vectorize the features
def vectorize_raw_data(record)
    arr_of_features = record[1..99]
    LabeledPoint( record[0] , arr_of_features)

# 2,3 + 4 map over each record for comparison
broadcast_var = [] 
def calc_distance(record, comparison)
    # here you want to keep a broadcast variable with a list or dictionary of
    # already compared IDs and break if the key pair already exists
    # then, calc the euclidean distance by mapping over the features of
    # the record and subtracting the values then squaring the result, keeping 
    # a running sum of those squares and square rooting that sum
    return {"id_pair" : [1,5], "distance" : 123}    

for record in allRecords:
  for comparison in allRecords:
    broadcast_var.append( calc_distance(record, comparison) )

# 5. map for 10 closest neighbors

def closest_neighbors(record, n=10)
     broadcast_var.filter(x => x.id_pair.include?(record.id) ).takeOrdered(n, distance)

伪代码很糟糕,但我认为它传达了意图。由于要将所有记录与所有其他记录进行比较,因此会有大量的洗牌和排序。在我看来,您希望将键值对/距离存储在一个中央位置(例如广播变量,尽管这很危险)中,以减少执行的欧几里得距离计算总数。

0

@xenocyon的博客在格式和用法方面缺少很多信息,我在下面写了一小段代码以便更好地理解。

df = df.withColumn('vector_list', F.array('x1', 'x2', 'x3', ... , 'x100'))
vectors_collected = df.select(df['x1'],df['x2'], ... , df['x100']).rdd.map(list).collect()
knn = NearestNeighbors(n_neighbors=5).fit(vectors_collected)
broadcast_knn = spark.sparkContext.broadcast(knn)
knn_results = df.select(df['vector_list']).rdd.map(lambda x: broadcast_knn.value.kneighbors(x))

以下代码旨在复制与sklearn类似的距离和索引。
numpy_knn_results = np.array(knn_results.collect())
# k is 5, hence reshape with 5 each row.
distance = numpy_knn_results.reshape(numpy_knn_results.shape[0]*2, 5)[0::2]
indices = numpy_knn_results.reshape(numpy_knn_results.shape[0]*2, 5)[1::2]

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接