我正在处理一个项目,基本上可以归结为解决矩阵方程
A.dot(x) = d
其中A
是一个大约为10,000,000乘以2,000的矩阵(我希望最终可以在两个方向上增加它的大小)。
A
显然无法放入内存中,因此必须将其并行化。我通过解决A.T.dot(A).dot(x) = A.T.dot(d)
来实现并行化。 A.T
将具有2000乘以2000的尺寸。可以通过将 A
和d
分成块A_i
和d_i
沿着行计算A_i.T.dot(A_i)
和A_i.T.dot(d_i)
,然后对这些进行求和就可以计算出来。非常适合并行化。我已经能够使用多进程模块实现了这一点,但由于内存使用情况,难以进一步扩展(增加A
的大小),而且不美观(因此不易于维护)。
Dask似乎是解决这两个问题的非常有前途的库,我已经尝试过了。我的A
矩阵很难计算:它基于大约15个不同的数组(大小等于A
中的行数),其中一些在迭代算法中用于评估相关勒让德函数。当块很小时(10,000行),建立任务图需要很长时间,而且需要大量内存(内存增加与调用迭代算法的时间相符)。当块较大时(50,000行),计算A.T.dot(A)
之前的内存消耗要小得多,但在计算时会迅速耗尽。我已经尝试过使用cache.Chest
,但它明显减慢了计算速度。
任务图必须非常大而复杂-调用A._visualize()
会崩溃。对于更简单的A矩阵,可以直接这样做(请参见@MRocklin的响应)。有没有办法简化它?
任何关于如何解决这个问题的建议都将不胜感激。
作为一个玩具例子,我尝试过
A = da.ones((2e3, 1e7), chunks = (2e3, 1e3))
(A.T.dot(A)).compute()
这也失败了,只有一个核心在运行时就用尽了所有内存。使用
chunks = (2e3, 1e5)
时,所有核心几乎立即启动,但是MemoryError
会在1秒钟内出现(我的电脑上有15 GB的内存)。chunks =(2e3,1e4)
效果更好,但最终也用尽了所有内存。编辑: 我划去了玩具示例测试,因为维度是错误的,并在剩余部分中纠正了维度。如@MRocklin所说,如果维度正确,它确实可以工作。我添加了一个问题,现在我认为它与我的问题更相关。
编辑2: 这是我试图做的事情的简化示例。问题在于,我认为在定义
A
中的列时涉及到了递归。import dask.array as da
N = 1e6
M = 500
x = da.random.random((N, 1), chunks = 5*M)
# my actual
A_dict = {0:x}
for i in range(1, M):
A_dict[i] = 2*A_dict[i-1]
A = da.hstack(tuple(A_dict.values()))
A = A.rechunk((M*5, M))
ATA = A.T.dot(A)
这似乎导致一个非常复杂的任务图,甚至在计算开始之前就占用了大量内存。
现在,我通过将递归放置在一个带有numpy数组的函数中,并且或多或少地执行
A = x.map_blocks(...)
,解决了这个问题。第二个注意点是,一旦我有了矩阵A的任务图,直接计算
A.T.dot(A)
似乎会出现一些内存问题(内存使用不太稳定)。因此,我明确地对它进行分块计算,并对结果求和。即使有了这些变通方法,dask在速度和可读性方面仍然有很大的优势。
chunks=(10000, 2000)
吗?你提供一个玩具示例的想法真的很有帮助,因为它让我可以在本地尝试一些东西。你能否提供另一个更能代表你问题的玩具示例? - MRocklinmap_blocks
,但可能有更优雅的方法来完成它... - sulkeh