Spark Mlib FPGrowth 任务失败,出现内存错误。

5

我有一个非常简单的用例,但可能会有非常大的结果集。我的代码在pyspark shell上执行以下操作:

from pyspark.mllib.fpm import FPGrowth
data = sc.textFile("/Users/me/associationtestproject/data/sourcedata.txt")
transactions = data.map(lambda line: line.strip().split(' '))
model = FPGrowth.train(transactions, minSupport=0.000001, numPartitions=1000)
# Perform any RDD operation
for item in model.freqItemsets().toLocalIterator():
    # do something with item

每当我通过调用count()或toLocalIterator来启动实际处理时,我的操作最终会出现内存不足错误。 FPGrowth没有将我的数据分区吗? 我的结果数据很大,即使是一个单个分区也会卡住我的内存吗? 如果是这样,有没有办法以“流”方式将RDD持久化到磁盘上,而不是试图将其保存在内存中?
感谢您提供任何见解。
编辑:FPGrowth的一个基本限制是整个FP树必须放入内存中。 因此,提高最小支持阈值的建议是有效的。
-Raj

你有多少内存?你有多少产品? - Alberto Bonsanto
嗨,Alberto:当本地运行时,我会给驱动程序12G的内存。我的输入文件相当大:有177468行,每行都有相当多的项。 - Raj
1个回答

3

问题很可能是支持阈值。当您设置一个非常低的值,就像这里一样(我不会称之为百万分之一频繁),您基本上放弃了下闭性质的所有好处。

这意味着考虑的项集数量呈指数增长,在最坏的情况下,它将等于2N-1m,其中N是项目数量。除非您有一个非常小的项目数量的玩具数据,否则根本行不通。

编辑:

请注意,使用约200K个事务(从评论中获取的信息)和支持阈值1e-6,您数据中的每个项集都必须频繁。因此,您在尝试枚举所有观察到的项集。


我同意你的评估。然而,我的希望是mlib能够扩展到大数据集。在查看FPGrowth.scala的内部结构时,我发现genFreqItems()方法执行了collect()。我想知道是否有一种方法可以重写它以避免完全收集或替换为本地迭代器。你有什么想法吗? - Raj
这并不重要。你根本无法通过指数复杂度获胜。即使你忽略了数学,也请花一点时间考虑一下。对于大约200K个交易和阈值1e-6,大小为1的每个项目集都是频繁的。每个可以在你的数据中找到的大小为2的项目集也将是频繁的。依此类推…… 即使你可以处理这种复杂性,它也不能提供任何有用的信息。 - zero323
嗨@zero323,我同意这不会有用。但在某些方面,这并不是重点。真正的问题是FPGrowth的这种实现无法扩展到那个规模。 - Raj
你是正确的。此算法需要整个FP树驻留在内存中。我必须提高阈值。 - Raj

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接