我有一个大约有5万个不同行和2列的表格,可以将每一行视为一个电影,列是该电影的属性 - “ID”:该电影的ID,“Tags”:电影的一些内容标签,以每个电影的字符串列表形式。
数据看起来像这样:
电影1,['浪漫','喜剧','英语']; 电影2,['动作','功夫','中文']
我的目标是首先根据它们对应的标签计算每部电影之间的杰卡德相似度,一旦完成,我就能知道每部电影(例如我选择的电影1)与此电影(本例中的电影1)最相似的其他前5个电影是什么。我想要的是不仅获取电影1本身的前5个结果,还要获取所有电影的前5个结果。
我尝试使用Python解决这个问题,但运行时间是一个大挑战。即使我使用了多进程,在6个核心上运行,总运行时间仍然超过20个小时。
以下是Python代码:
数据看起来像这样:
电影1,['浪漫','喜剧','英语']; 电影2,['动作','功夫','中文']
我的目标是首先根据它们对应的标签计算每部电影之间的杰卡德相似度,一旦完成,我就能知道每部电影(例如我选择的电影1)与此电影(本例中的电影1)最相似的其他前5个电影是什么。我想要的是不仅获取电影1本身的前5个结果,还要获取所有电影的前5个结果。
我尝试使用Python解决这个问题,但运行时间是一个大挑战。即使我使用了多进程,在6个核心上运行,总运行时间仍然超过20个小时。
以下是Python代码:
import pandas as pd
from collections import Counter
import numpy as np
from multiprocessing import Pool
import time
col_names=['movie_id','tag_name']
df=pd.read_csv("movies.csv",names=col_names)
movie_ids=df['movie_id'].tolist()
tag_list=df['tag_name'].tolist()
def jaccard_similarity(string1, string2):
intersection = set(string1).intersection(set(string2))
union = set(string1).union(set(string2))
return len(intersection)/float(len(union))
def jc_results(movie_id):
result=Counter()
this_index=movie_ids.index(movie_id)
for another_id in movie_ids:
that_index=movie_ids.index(another_id)
if another_id==movie_id:
continue
else:
tag_1=tag_list[this_index]
tag_2=tag_list[that_index]
jaccard = jaccard_similarity(tag_1,tag_2)
result[(movie_id,another_id)]=jaccard
return result.most_common(10)
from multiprocessing import Pool
pool=Pool(6)
results={}
for movie_id in movie_ids:
results[movie_id]=pool.apply_async(jc_results,args=(movie_id,))
pool.close()
pool.join()
for movie_id, res in results.items():
results[movie_id] = res.get()
我想切换到Pyspark,但是我对spark python还很陌生,在写了几行代码后就卡住了,实际上我只能读取数据到RDD,使用sc.textFile...我已经阅读了现有的帖子,但它们都使用Scala...如果有人能提供帮助或指导Pyspark,那将非常好。非常感谢!