Spark Python:如何计算RDD中每行之间的Jaccard相似性?

3
我有一个大约有5万个不同行和2列的表格,可以将每一行视为一个电影,列是该电影的属性 - “ID”:该电影的ID,“Tags”:电影的一些内容标签,以每个电影的字符串列表形式
数据看起来像这样:
电影1,['浪漫','喜剧','英语']; 电影2,['动作','功夫','中文']
我的目标是首先根据它们对应的标签计算每部电影之间的杰卡德相似度,一旦完成,我就能知道每部电影(例如我选择的电影1)与此电影(本例中的电影1)最相似的其他前5个电影是什么。我想要的是不仅获取电影1本身的前5个结果,还要获取所有电影的前5个结果。
我尝试使用Python解决这个问题,但运行时间是一个大挑战。即使我使用了多进程,在6个核心上运行,总运行时间仍然超过20个小时。
以下是Python代码:

import pandas as pd
from collections import Counter
import numpy as np
from multiprocessing import Pool
import time

col_names=['movie_id','tag_name']
df=pd.read_csv("movies.csv",names=col_names)
movie_ids=df['movie_id'].tolist()
tag_list=df['tag_name'].tolist()

def jaccard_similarity(string1, string2):
    intersection = set(string1).intersection(set(string2))
    union = set(string1).union(set(string2))
    return len(intersection)/float(len(union))

def jc_results(movie_id):
    result=Counter()
    this_index=movie_ids.index(movie_id)
    for another_id in movie_ids:
        that_index=movie_ids.index(another_id)
        if another_id==movie_id:
            continue
        else:
            tag_1=tag_list[this_index]
            tag_2=tag_list[that_index]
            jaccard = jaccard_similarity(tag_1,tag_2)
            result[(movie_id,another_id)]=jaccard
    return result.most_common(10)


from multiprocessing import Pool
pool=Pool(6)
results={}
for movie_id in movie_ids:
    results[movie_id]=pool.apply_async(jc_results,args=(movie_id,))
pool.close()
pool.join()
for movie_id, res in results.items():
    results[movie_id] = res.get()

我想切换到Pyspark,但是我对spark python还很陌生,在写了几行代码后就卡住了,实际上我只能读取数据到RDD,使用sc.textFile...我已经阅读了现有的帖子,但它们都使用Scala...如果有人能提供帮助或指导Pyspark,那将非常好。非常感谢!


你解决了这个问题吗?我也在寻找类似的问题。如果你找到了任何解决方案,请告诉我。 - Anil Kumar
1
@AnilKumar,请查看我下面的答案。 - Bob Swain
1个回答

3
您可以尝试类似于这个stackoverflow答案的解决方案,但是由于您的数据已经被分词(一个字符串列表),因此您不需要执行该步骤或ngram步骤。
我还要提一下,在pyspark中,approxDistJoin计算的是Jaccard距离而不是Jaccard相似度,但是如果您需要特别转换为相似度,则可以从1中减去。
您的代码最终将类似于:
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, MinHashLSH
import pyspark.sql.functions as f

db = spark.createDataFrame([
        ('movie_1', ['romantic','comedy','English']),
        ('movie_2', ['action','kongfu','Chinese']),
        ('movie_3', ['romantic', 'action'])
    ], ['movie_id', 'genres'])


model = Pipeline(stages=[
        HashingTF(inputCol="genres", outputCol="vectors"),
        MinHashLSH(inputCol="vectors", outputCol="lsh", numHashTables=10)
    ]).fit(db)

db_hashed = model.transform(db)

db_matches = model.stages[-1].approxSimilarityJoin(db_hashed, db_hashed, 0.9)

#show all matches (including duplicates)
db_matches.select(f.col('datasetA.movie_id').alias('movie_id_A'),
                 f.col('datasetB.movie_id').alias('movie_id_B'),
                 f.col('distCol')).show()

#show non-duplicate matches
db_matches.select(f.col('datasetA.movie_id').alias('movie_id_A'),
                 f.col('datasetB.movie_id').alias('movie_id_B'),
                 f.col('distCol')).filter('movie_id_A < movie_id_B').show()

使用相应的输出:

+----------+----------+-------+
|movie_id_A|movie_id_B|distCol|
+----------+----------+-------+
|   movie_3|   movie_3|    0.0|
|   movie_1|   movie_3|   0.75|
|   movie_2|   movie_3|   0.75|
|   movie_1|   movie_1|    0.0|
|   movie_2|   movie_2|    0.0|
|   movie_3|   movie_2|   0.75|
|   movie_3|   movie_1|   0.75|
+----------+----------+-------+

+----------+----------+-------+
|movie_id_A|movie_id_B|distCol|
+----------+----------+-------+
|   movie_1|   movie_3|   0.75|
|   movie_2|   movie_3|   0.75|
+----------+----------+-------+

请问您能否解释一下numHashTables表示什么,以及为什么选择了10? - Shirin Yavari
@ShirinYavari 这个链接解释得还可以:https://cran.r-project.org/web/packages/textreuse/vignettes/textreuse-minhash.html。总的来说,我选择了10个,这个数字有点随意。10表示代码将从每个记录中创建10个独立的minhash值,然后approxSimilarityJoin将比较每行的这10个值,查看每行中有多少值相同而不同,并生成相似度分数。如果你选择少于10个,行之间计算的相似度会不够准确;如果你选择多于10个,则会更准确。 - Bob Swain
非常感谢你的解释和链接。我一定会仔细研究的。 - Shirin Yavari
@BobSwain 你好,能否帮我看一下我的问题?我可以运行这个小数据集,例如100条记录,但是当我尝试处理1万条记录时,就会遇到这个错误。https://stackoverflow.com/questions/73968030/failed-to-execute-user-defined-functionlshmodellambda - Chris_007

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接