在pyspark中计算数据框的所有行之间的余弦相似度

13

我有一个包含工人的人口统计信息,如年龄、性别、地址等以及他们的工作地点的数据集。我从数据集中创建了一个RDD,并将其转换为DataFrame。

每个ID都有多个条目。因此,我创建了一个仅包含工人ID和他/她曾经工作过的各个办公室位置的DataFrame。

    |----------|----------------|
    | **ID**    **Office_Loc**  |
    |----------|----------------|
    |   1      |Delhi, Mumbai,  |
    |          | Gandhinagar    |
    |---------------------------|
    |   2      | Delhi, Mandi   | 
    |---------------------------|
    |   3      |Hyderbad, Jaipur|
    -----------------------------

我希望计算每个工人之间基于他们的办公地点位置的余弦相似度。

因此,我迭代了数据帧的行,从中检索出一个单独的行:

myIndex = 1
values = (ID_place_df.rdd.zipWithIndex()
            .filter(lambda ((l, v), i): i == myIndex)
            .map(lambda ((l,v), i): (l, v))
            .collect())

接着使用 map 函数

    cos_weight = ID_place_df.select("ID","office_location").rdd\
  .map(lambda x: get_cosine(values,x[0],x[1]))

计算提取行和整个DataFrame之间的余弦相似度。

我认为我的方法不是一个好方法,因为我正在迭代DataFrame的行,这违背了使用spark的初衷。 在pyspark中有更好的方法吗? 请给予指导。


1
我认为这是一个有点长的问题。通常最好的做法是用最简单的情况来提问,以确保你得到的是相同的问题。 - ChaosPredictor
2个回答

38
你可以使用mllib包来计算每行TF-IDF的L2范数。然后将表格自乘,以得到余弦相似度,即两个L2范数的点积: 1. RDD
rdd = sc.parallelize([[1, "Delhi, Mumbai, Gandhinagar"],[2, " Delhi, Mandi"], [3, "Hyderbad, Jaipur"]])
  • 计算TF-IDF

documents = rdd.map(lambda l: l[1].replace(" ", "").split(","))

from pyspark.mllib.feature import HashingTF, IDF
hashingTF = HashingTF()
tf = hashingTF.transform(documents)

您可以在 HashingTF 中指定特征数以使特征矩阵更小(列数更少)。

    tf.cache()
    idf = IDF().fit(tf)
    tfidf = idf.transform(tf)
  • 计算L2范数:

from pyspark.mllib.feature import Normalizer
labels = rdd.map(lambda l: l[0])
features = tfidf

normalizer = Normalizer()
data = labels.zip(normalizer.transform(features))
  • 通过将矩阵与自身相乘来计算余弦相似度:

  • from pyspark.mllib.linalg.distributed import IndexedRowMatrix
    mat = IndexedRowMatrix(data).toBlockMatrix()
    dot = mat.multiply(mat.transpose())
    dot.toLocalMatrix().toArray()
    
        array([[ 0.        ,  0.        ,  0.        ,  0.        ],
               [ 0.        ,  1.        ,  0.10794634,  0.        ],
               [ 0.        ,  0.10794634,  1.        ,  0.        ],
               [ 0.        ,  0.        ,  0.        ,  1.        ]])
    

    或者: 使用numpy数组的笛卡尔积和函数dot

    data.cartesian(data)\
        .map(lambda l: ((l[0][0], l[1][0]), l[0][1].dot(l[1][1])))\
        .sortByKey()\
        .collect()
    
        [((1, 1), 1.0),
         ((1, 2), 0.10794633570596117),
         ((1, 3), 0.0),
         ((2, 1), 0.10794633570596117),
         ((2, 2), 1.0),
         ((2, 3), 0.0),
         ((3, 1), 0.0),
         ((3, 2), 0.0),
         ((3, 3), 1.0)]
    

    2. DataFrame

    由于您似乎在使用数据框,因此可以使用spark ml包代替:

    import pyspark.sql.functions as psf
    df = rdd.toDF(["ID", "Office_Loc"])\
        .withColumn("Office_Loc", psf.split(psf.regexp_replace("Office_Loc", " ", ""), ','))
    
    • 计算TF-IDF:

    from pyspark.ml.feature import HashingTF, IDF
    hashingTF = HashingTF(inputCol="Office_Loc", outputCol="tf")
    tf = hashingTF.transform(df)
    
    idf = IDF(inputCol="tf", outputCol="feature").fit(tf)
    tfidf = idf.transform(tf)
    
  • 计算L2范数:

    from pyspark.ml.feature import Normalizer
    normalizer = Normalizer(inputCol="feature", outputCol="norm")
    data = normalizer.transform(tfidf)
    
  • 计算矩阵乘积:

    from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
    mat = IndexedRowMatrix(
        data.select("ID", "norm")\
            .rdd.map(lambda row: IndexedRow(row.ID, row.norm.toArray()))).toBlockMatrix()
    dot = mat.multiply(mat.transpose())
    dot.toLocalMatrix().toArray()
    

    或:使用连接和 UDF 函数 dot

    dot_udf = psf.udf(lambda x,y: float(x.dot(y)), DoubleType())
    data.alias("i").join(data.alias("j"), psf.col("i.ID") < psf.col("j.ID"))\
        .select(
            psf.col("i.ID").alias("i"), 
            psf.col("j.ID").alias("j"), 
            dot_udf("i.norm", "j.norm").alias("dot"))\
        .sort("i", "j")\
        .show()
    
        +---+---+-------------------+
        |  i|  j|                dot|
        +---+---+-------------------+
        |  1|  2|0.10794633570596117|
        |  1|  3|                0.0|
        |  2|  3|                0.0|
        +---+---+-------------------+
    
  • 本教程列举了不同的方法来对大规模矩阵进行乘法运算:https://labs.yodas.com/large-scale-matrix-multiplication-with-pyspark-or-how-to-match-two-large-datasets-of-company-1be4b1b2871e


    谢谢您的回答。我非常感激您的帮助。但是在使用数据框进行哈希TF转换时,代码出现了错误requirement failed: The input column must be ArrayType, but got StringType.' - Abhinav Choudhury
    1
    你必须先将字符串列表拆分为单词列表。我添加了如何创建 df 的部分。 - MaFF
    尝试将numFeatures设置为数据框中不同城市的数量,默认值为262144,这将是块矩阵中的列数(我已将其设置为10以供您提供的示例数据)。使用笛卡尔积点积也可以。请查看链接以了解大矩阵乘法。 - MaFF
    当我使用288个ID进行点积笛卡尔积并将其转换为DataFrame result=data.cartesian(data)\ .map(lambda l: ((l[0][0]), (l[1][0]), l[0][1].dot(l[1][1])))\ .toDF()时,我会收到以下错误信息:不支持的类型:<type 'numpy.float64'>。但是,当我尝试使用10个条目的小例子时,我没有遇到这个错误。 - Abhinav Choudhury
    为什么dot.toLocalMatrix().toArray()产生的数组大小是4 x 4,而不是3 x 3,因为有三个标签(1、2、3)? - kitchenprinzessin
    显示剩余5条评论

    3
    关于这个问题,由于我正在一个使用pyspark的项目中,而我必须使用余弦相似度,因此我不得不说,@MaFF的代码是正确的。实际上,当我看到他的代码时,我犹豫了,因为他在使用向量的L2范数的点积,并且理论上说:数学上,它是向量的点积和两个向量的大小乘积的比率。
    以下是我用同样的结果调整过的代码,所以我得出结论SKLearn以不同的方式计算tfidf,因此如果你尝试使用sklearn重放这个练习,你将得到不同的结果。
    d = [{'id': '1', 'office': 'Delhi, Mumbai, Gandhinagar'}, {'id': '2', 'office': 'Delhi, Mandi'}, {'id': '3', 'office': 'Hyderbad, Jaipur'}]
    df_fussion = spark.createDataFrame(d)
    df_fussion = df_fussion.withColumn('office', F.split('office', ', '))
    
    
    from pyspark.ml.feature import HashingTF, IDF
    hashingTF = HashingTF(inputCol="office", outputCol="tf")
    tf = hashingTF.transform(df_fussion)
    
    idf = IDF(inputCol="tf", outputCol="feature").fit(tf)
    data = idf.transform(tf)   
    
    @udf
    def sim_cos(v1,v2):
        try:
            p = 2
            return float(v1.dot(v2))/float(v1.norm(p)*v2.norm(p))
        except:
            return 0
    
    result = data.alias("i").join(data.alias("j"), F.col("i.ID") < F.col("j.ID"))\
        .select(
            F.col("i.ID").alias("i"),
            F.col("j.ID").alias("j"),
            sim_cos("i.feature", "j.feature").alias("sim_cosine"))\
        .sort("i", "j")
    result.show()
    

    我还想和您分享一些我用简单向量做的测试,结果是正确的:

    在Pyspark中为两个简单向量计算余弦相似度

    此致敬礼。


    谢谢您分享代码。我可以问一下您处理的数据集有多大吗?计算具有1000个特征和10000行的df相似性花费了相当长的时间。我想知道是否有方法可以加快这个过程? - Joe
    我不得不处理大约10万行的数据集。如果你想将它们相互比较,你知道N行可能的合作伙伴数量是N*(N-1)/2,也就是说,有5*(10**9)种可能的组合。因此,我建议您按照您认为代表性较强的某些特征对其进行分组,以隔离少量的行组,并将此技术应用于这些组。 - pabloverd

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接