Spark LDA消耗过多内存。

15
我想使用Spark MLlib LDA来总结我的文档语料库。
我的问题设置如下:
- 约有100,000个文档 - 约有400,000个唯一单词 - 100个聚类
我有16台服务器(每个服务器有20个内核和128GB内存)。 当我使用OnlineLDAOptimizer执行LDA时,它会出现内存错误,建议我增加spark.driver.maxResultSize,例如: Total size of serialized results of 11 tasks (1302 MB) is bigger than spark.driver.maxResultSize 我将spark.driver.maxResultSize增加到120GB (并且也将spark.driver.memory增加到120GB),重新运行LDA但仍然报错。
它仍然说: Total size of serialized results of 11 tasks (120.1 GB) is bigger than spark.driver.maxResultSize 我尝试另一个数据集,大约有100,000个唯一的单词,它能正常工作。
那么,我该如何估计使用Spark MLlib LDA时的内存使用情况?我在官方文档中找不到任何规范。
请注意,我用稀疏向量构建了传递给LDA.run()的文档RDD[(Long, Vector)],但不知道Spark LDA是否能够正确处理稀疏格式。
(编辑) 我使用的是Scala版本的LDA。 不是Python版本。
这可能是相关的问题,但没有明确的答案。 Spark LDA woes - prediction and OOM questions (编辑)
这是我的代码片段(gist)。 https://gist.github.com/lucidfrontier45/11420721c0078c5b7415
def startJob(args: RunArgs)(implicit sc: SparkContext): Unit = {
    val src = sc.textFile(args.fname, minPartitions = args.n_partitions).map(_.split("\t"))
        .flatMap {
            // input file's format is (user_id, product_name, count)
            case Array(u, p, r, t) => Some((u.toInt, p.toInt, r.toDouble))
            case _ => None
        }.persist()

    // Map to convert user_id or product_name into unique sequencential id
    val userid_map = src.map(_._1).distinct().zipWithIndex().collect().toMap
    val productid_map = src.map(_._2).distinct().zipWithIndex().collect().toMap
    val inverse_userid_map = userid_map.map(_.swap)

    // broadcat to speedup RDD map operation
    val b_userid_map = sc.broadcast(userid_map)
    val b_productid_map = sc.broadcast(productid_map)
    val b_inverse_userid_map = sc.broadcast(inverse_userid_map)

    // run map
    val transformed_src = src.map { case (u, p, r) =>
        (b_userid_map.value(u), b_productid_map.value(p).toInt, r)
    }

    println("unique items = %d".format(b_productid_map.value.size))

    // prepare for LDA input RDD[(LONG, Vector)]
    val documents = transformed_src.map { case (u, p, r) => (u, (p, r)) }
        .groupByKey()
        .map { t => (t._1, Vectors.sparse(b_productid_map.value.size, t._2.toSeq)) }.persist()

    documents.count()
    src.unpersist()

    // run Online Variational LDA
    val ldamodel = new LDA()
        .setK(args.k)
        .setMaxIterations(args.n_iter)
        .setOptimizer("online")
        .run(documents)
        .asInstanceOf[LocalLDAModel]


    val result = ldamodel.topicDistributions(documents)
        .map { case (i, v) =>
            val u = b_inverse_userid_map.value(i)
            "%d,%s".format(u, v.toArray.mkString(","))
        }
    result.saveAsTextFile(args.out)
}

实际上,我使用LDA对交易数据进行降维。我的数据格式为(u,p,r),其中u是用户ID,p是产品名称,r是用户up互动的次数。在这种情况下,用户对应文档,产品对应单词。由于用户ID和产品名称是任意字符串,因此在提交到LDA之前,我将它们转换为唯一的顺序整数。
谢谢。

作为一条注释,mllib LDA在训练时可以正确处理稀疏向量。 - Patrick the Cat
1
谢谢,最好将此放在一个问题中。LDA返回至少两个相对较大的本地对象topicsMatrix(#文档#聚类)和describeTopics(这与(#聚类#令牌* 2)成比例)。乍一看,它不应占用120GB,但仍然很多。 - zero323
这对于单个机器来说仍然是很多的。100K个单词时它消耗了多少内存?仅驱动程序。 - zero323
这是一个老问题,但我仍然遇到了这个问题(spark 1.6.1)- 我发现在spark管道中为HashingTF设置特征数为2^18-1(比默认值少一个)一切正常工作,我可以使用spark.driver.memory=1g保存整个管道。使用默认的2^18会导致保存操作产生堆空间OOM错误。我还尝试将spark.driver.memoryspark.driver.maxResultSize增加到8g,但对于2^18特征没有帮助 - 我不清楚是驱动程序还是运行内存不足。 - Matti Lyra
你提供给.saveAsTextFile( ... )的输出路径是什么? - Fokko Driesprong
显示剩余5条评论
1个回答

1

这个问题有三个常见原因,它们可能独立或同时起作用。

  1. 这个任务使用类似于collect的方式向驱动程序返回了大量数据。不幸的是,一些SparkML代码也是这样做的。如果不能将问题归咎于下面的(2)或(3),那么很可能是你的数据与OnlineLDAOptimizer实现交互的结果。

  2. 这个任务涉及大量的任务,每个任务都会作为Spark作业管理的一部分将结果返回给驱动程序(而不是使用类似于collect的方式)。检查SparkUI中的任务数量。还可以参见超过`spark.driver.maxResultSize`而没有将任何数据带到驱动程序。堆栈跟踪中是否有org.apache.spark.scheduler.TaskSetManager#canFetchMoreResultsorg.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask?

  3. 估计误差:Spark显着高估了即将返回到驱动程序的数据大小,并抛出此错误以防止集群的驱动程序OOM。参见什么是spark.driver.maxResultSize?。测试这种情况的一种方法是将spark.driver.maxResultSize设置为0(无限制)并观察发生了什么。

希望这能帮到您!

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接