如何使用pyspark(2.1.0) LdA获取每个文档相关主题?

7
我正在使用pyspark的LDAModel从语料库中获取主题。我的目标是找到与每个文档相关的主题。为此,我尝试根据文档设置topicDistributionCol。由于我是新手,我不确定这列的目的是什么。
from pyspark.ml.clustering import LDA
lda_model = LDA(k=10, optimizer="em").setTopicDistributionCol("topicDistributionCol")
// documents is valid dataset for this lda model
lda_model = lda_model.fit(documents)
transformed = lda_model.transform(documents)

topics = lda_model.describeTopics(maxTermsPerTopic=num_words_per_topic)
print("The topics described by their top-weighted terms:")
print topics.show(truncate=False)

它列出了所有主题及其术语索引和术语权重。

enter image description here

以下代码将给我提供topicDistributionCol。这里每一行都是针对每个文档的。
print transformed.select("topicDistributionCol").show(truncate=False)

enter image description here

我希望能够像这样获取文档主题矩阵。使用pyspark的LDA模型是否可行?
doc | topic 
1   |  [2,4]
2   |  [3,4,6]

注意:我之前使用gensim的LDA模型完成了这个任务,使用以下代码。但是我现在需要使用pyspark的LDA模型。
texts = [[word for word in document.lower().split() if word not in stoplist] for document in documents]
dictionary = corpora.Dictionary(texts)

corpus = [dictionary.doc2bow(text) for text in texts]
doc_topics = LdaModel(corpus=corpus, id2word=dictionary, num_topics=10, passes=10)
## to fetch topics for one document
vec_bow = dictionary.doc2bow(text[0])
Topics = doc_topics[vec_bow]
Topic_list = [x[0] for x in Topics]
## topic list is [1,5]
3个回答

1

不要将数据从Spark中取出!这样做会使使用Spark的目的失去意义。如果您实际上不需要它,请使用scikit。

在@kevin提供的答案基础上,一旦您在LDA模型上运行了transform方法transformed = model.transform(input_data)以获取您的主题分布,您应该会看到一个带有此列的数据框:

+--------------------+
|  topicDistribution |
+--------------------+
|[8.11971897779803...|
|[0.00341037397939...|
|[0.00141274604502...|

这是一个测量列表,用于每一行(预测)到n维三角形单纯形形状中的所有顶点的距离,该形状是LDA模型。

您可以使用下面这个方便的小UDF自动映射每行的一个顶部主题,通过在上面的列中运行它来运行概率分布。

简而言之,我们正在查找列表中的最大概率,这是您的顶级主题。概率测量值与您的主题相同的顺序

from pyspark.sql.types import IntegerType
import pyspark.sql.functions as f

@f.udf(returnType=IntegerType())
def top_topic(topic_dist):
    dict = {prob: i for i, prob in enumerate(topic_dist)}
    return dict.get(max(topic_dist))

如果您只想将分布中每行一个顶级主题进行映射,则时间复杂度为O(n)。

但是,如果您希望为每个文档建模多个关系,则可以按以下方式修改方法,以支持最多k个主题,这将给您带来O(n ^ k)的复杂度,其中k是您想要返回的主题数。因此,请保持k合理;)

from pyspark.sql.types import ArrayType, IntegerType
import pyspark.sql.functions as f

@f.udf(returnType=ArrayType(IntegerType()))
def top_3_topics(topic_dist):
    k = 3
    res = []
    mutable = list(topic_dist)
    dict = {prob: i for i, prob in enumerate(topic_dist)}
    for i in range(k):
        current = dict.get(max(mutable))
        res.append(current)
        mutable[current] = 0.0

    del mutable
    return res

完成后,您将得到这个:

+--------------------+-----+
|  topicDistribution |topic|
+--------------------+-----+
|[8.11971897779803...|   19|
|[0.00341037397939...|   12|
|[0.00141274604502...|   19|

或者使用这种方式(如果您想要多个主题,则使用ArrayType列):

+--------------------+------------+
|  topicDistribution |       topic|
+--------------------+------------+
|[8.11971897779803...|  [19, 1, 7]|
|[0.00341037397939...| [12, 16, 4]|
|[0.00141274604502...|[19, 16, 11]|

这是为了将主题列附加到您现有的结果中而设计的。

命名您的主题并自动将主题标签与您的结果连接:

from pyspark.sql.types import StringType, StructType, StructField

schema = StructType([
            StructField("topic", StringType(), True),
            StructField("topic_name", StringType(), True)])

data = [('0', 'politics'),
        ('1', 'global'),
        ('2', 'crime')]

        topics_data = self.spark.createDataFrame(data, schema)
        results = results.join(topics_data, 'topic', 'left')

1

使用 toPandas 可以帮助:

df_p = transformed.select('topicDistributionCol').toPandas()
df_p1 = df_p.topicDistribution.apply(lambda x:np.array(x))
df_p2 = pd.DataFrame(df_p1.tolist()).apply(lambda x:x.argmax(),axis=1)
df_p3 = df_p2.reset_index()
df_p3.columns = ['doc','topic']
df_p3

如果您使用了管道,该如何处理?我有一个管道,可以访问LDA阶段,但从那里开始,topicDistributionCol是一个Param对象,没有toPandas... - Emmanuel Murairi

0
我认为对这个问题有一个简单的答案。按照以下步骤操作:
transformed.take(10)

输出的最后一列将是“topicDistribution”,它是文档-主题分布。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接