给出包含以下信息的10亿条记录:
ID x1 x2 x3 ... x100
1 0.1 0.12 1.3 ... -2.00
2 -1 1.2 2 ... 3
...
对于上述每个ID,我想找到其最接近的前10个ID,基于它们的向量(x1, x2, ..., x100)的欧几里得距离。如何计算最佳方式?给出包含以下信息的10亿条记录:
ID x1 x2 x3 ... x100
1 0.1 0.12 1.3 ... -2.00
2 -1 1.2 2 ... 3
...
对于上述每个ID,我想找到其最接近的前10个ID,基于它们的向量(x1, x2, ..., x100)的欧几里得距离。如何计算最佳方式?scikit-learn
提供的算法,然后广播索引和距离的结果并进一步处理。nbrs = NearestNeighbors(n_neighbors=10, algorithm='auto').fit(vectorized_data)
3- 在您的向量化数据上运行训练好的算法(在您的情况下,训练和查询数据是相同的)。
distances, indices = nbrs.kneighbors(qpa)
你没有提供太多细节,但我处理这个问题的一般方法是:
{id_pair: [1,5], distance: 123}
的数据结构你已经确定了pyspark,我通常使用scala来完成这种类型的工作,但每个步骤的伪代码可能如下:
# 1. vectorize the features
def vectorize_raw_data(record)
arr_of_features = record[1..99]
LabeledPoint( record[0] , arr_of_features)
# 2,3 + 4 map over each record for comparison
broadcast_var = []
def calc_distance(record, comparison)
# here you want to keep a broadcast variable with a list or dictionary of
# already compared IDs and break if the key pair already exists
# then, calc the euclidean distance by mapping over the features of
# the record and subtracting the values then squaring the result, keeping
# a running sum of those squares and square rooting that sum
return {"id_pair" : [1,5], "distance" : 123}
for record in allRecords:
for comparison in allRecords:
broadcast_var.append( calc_distance(record, comparison) )
# 5. map for 10 closest neighbors
def closest_neighbors(record, n=10)
broadcast_var.filter(x => x.id_pair.include?(record.id) ).takeOrdered(n, distance)
@xenocyon的博客在格式和用法方面缺少很多信息,我在下面写了一小段代码以便更好地理解。
df = df.withColumn('vector_list', F.array('x1', 'x2', 'x3', ... , 'x100'))
vectors_collected = df.select(df['x1'],df['x2'], ... , df['x100']).rdd.map(list).collect()
knn = NearestNeighbors(n_neighbors=5).fit(vectors_collected)
broadcast_knn = spark.sparkContext.broadcast(knn)
knn_results = df.select(df['vector_list']).rdd.map(lambda x: broadcast_knn.value.kneighbors(x))
numpy_knn_results = np.array(knn_results.collect())
# k is 5, hence reshape with 5 each row.
distance = numpy_knn_results.reshape(numpy_knn_results.shape[0]*2, 5)[0::2]
indices = numpy_knn_results.reshape(numpy_knn_results.shape[0]*2, 5)[1::2]