Python 多进程处理海量数据

3

我已经在网站上搜索,但不确定哪些术语会产生相关答案,如果这个问题是重复的,我深表歉意。

我需要处理一个非常大的矩阵(14,000,000 * 250,000),并希望利用Python的multiprocessing模块加速处理。对于矩阵中的每一对列,我需要应用一个函数,然后将结果存储在专有类中。

我将实现一个双重四循环,提供所需的列组合。

我不想将池子里负载250,000个任务,因为我担心内存使用量会很大。理想情况下,我希望有一列,然后将其分配到池中,即进程1获取A列和B列,然后函数F获取A,B和G并将结果存储在Class G[A,B]中,进程2获取A列和C列,以此类推。

进程将永远不会访问G的相同元素。

因此,我希望每N个任务暂停循环。G的set/get方法将被覆盖以执行一些后端任务。

我不明白的是是否有必要暂停循环?即Python是否足够聪明以仅获取它可以处理的内容?还是将填充大量的任务?

最后,我不清楚结果的工作方式。我只希望将它们设置在G中,并且不返回任何内容。我不想担心.get()等问题,但据我所知,池方法返回结果对象。我可以忽略这个吗?

有更好的方法吗?我完全迷失了吗?


你能展示一下你的代码吗? - univerio
目前没有什么可以展示的。我只是在尝试弄清楚 MP 模块的工作原理。 - Michael Chase
如果您想手动处理(使用循环)大量数据,则Python不是最佳选择。即使您使用“多进程”,解释器开销也太高了。如果您正在进行数值计算,可以尝试使用NumPy。 - loopbackbee
感谢您的回复。Numpy将在后台处理一些事情(即列将是np.matrix)。问题是,由于内存限制和矩阵结构的特定实现,我需要非常特定的矩阵函数。 - Michael Chase
我相信你所寻找的是生产者-消费者模式,当你提到“暂停”你的for循环时。 - univerio
至于你问题的第二部分,你不能在进程之间共享任意对象。这意味着你不能直接从工作进程中更改 G。你需要查看从池工作进程返回的结果。 - univerio
1个回答

2
首先,您需要创建一个多处理池类。您可以设置要使用的工作进程数量,然后使用“map”启动任务。我相信您已经知道,但这是Python multiprocessing文档。
您说您不想返回数据,因为您不需要,但您计划如何查看结果呢?每个任务是否会将数据写入磁盘?要在进程之间传递数据,您需要使用类似于multiprocessing queue的东西。
以下是链接中如何使用进程和队列的示例代码:
from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print q.get()    # prints "[42, None, 'hello']"
    p.join()

以下是使用池(Pool)的示例:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

编辑:@goncalopp指出,由于Python的速度较慢,您可能不想在Python中进行大量的数值计算。 Numpy是一个非常好的用于数值计算的软件包。

如果您因每个进程都要写入磁盘而受到严重的IO限制,那么您应该考虑运行类似于4*num_processors这样的内容,以便始终有事情可做。您还应该确保拥有非常快的磁盘:)


谢谢您的回复。我的顾虑是我不想让池子过大;Python会根据实例能够处理的任务来选择任务吗?还是会将所有任务加载到池子里然后开始处理?结果会存储在其他地方,我并没有意图查看它们,只是对它们进行更多的操作。 - Michael Chase
我认为您认为进入队列的内容和我们认为的不一致。为了控制您的内存使用,您应该根据需要将列群加载到队列中(例如,保持长度为10)。然后,您的工作者(例如每个处理器一个)将从队列中取出项目并进行处理。我们仍然不理解的是,您想要对新修改的结构执行什么操作(例如“写入磁盘”)。 - Paul Seeb
谢谢Ben,这很有帮助。如果我使用apply_async,我需要.get()以便进程继续进行下一个任务吗?我注意到它需要一个回调函数,如果它只返回空值,那对我的目的是否足够? - Michael Chase
他说:“G的set/get方法将被重写以执行一些后端任务。” 这意味着数据可能会保存在数据库或磁盘中。 我假设他的主要过程将加载矩阵数据,并且具有拉取、处理和卸载内存的任务。 - Ben Echols
1
请注意,默认情况下,“池”不会启动超过机器核心数的工作进程。太多的工作进程只会导致它们争夺资源而不是执行有用的工作。因此,我建议您从默认数量开始,并在逐渐增加到2倍核心数的过程中进行吞吐量测试。看看性能何时达到峰值。 - Roland Smith
显示剩余7条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接