用Python将数据作为后台进程写入磁盘

7

我有一个用Python编写的程序,基本上执行以下操作:

for j in xrange(200):
    # 1) Compute a bunch of data
    # 2) Write data to disk

1) 大约需要2-5分钟。
2) 大约需要1分钟。

请注意,要保留的数据太多而无法在内存中保存。

理想情况下,我希望以避免将CPU空闲的方式将数据写入磁盘。这在Python中可能吗?谢谢!

3个回答

11
你可以尝试像这样使用多进程来解决问题:
import multiprocessing as mp

def compute(j):
    # compute a bunch of data
    return data

def write(data):
    # write data to disk

if __name__ == '__main__':
    pool = mp.Pool()
    for j in xrange(200):
        pool.apply_async(compute, args=(j, ), callback=write)
    pool.close()
    pool.join()

pool = mp.Pool()将创建一个工作进程池。默认情况下,工作进程的数量等于您的计算机拥有的CPU内核数。

每个pool.apply_async调用都会将一个任务排队,由工作进程池中的一个工作进程运行。当有空闲的工作进程时,它会运行compute(j)。当工作进程返回一个值data时,主进程中的线程将运行回调函数write(data),其中data是工作进程返回的数据。

一些注意事项:

  • 数据必须是可序列化的,因为它是通过Queue从工作进程传回主进程的。
  • 不能保证工作进程完成任务的顺序与将任务发送到池中的顺序相同。因此,写入磁盘的数据顺序可能不对应于j从0到199的范围。解决这个问题的一种方法是使用sqlite(或其他类型的)数据库将数据写入,其中j是数据的一个字段。然后,当您希望按顺序读取数据时,可以使用SELECT * FROM table ORDER BY j
  • 使用多个进程将增加所需的内存量,因为由工作进程生成的数据和等待写入磁盘的数据在队列中累积。您可以尝试使用NumPy数组来减少所需的内存量。如果这不可能,那么您可能需要减少进程数:

pool = mp.Pool(processes=1) 

这样会创建一个工作进程(用于运行compute),而主进程则运行write。由于compute的执行时间比write长,因此队列不会出现多个要写入磁盘的数据块。但是,您仍然需要足够的内存来计算一个数据块,同时将另一个数据块写入磁盘。

如果您没有足够的内存来同时执行这两个操作,那么您别无选择——您的原始代码,即按顺序运行computewrite的方式,是唯一的方法。


为什么使用进程和写入文件只是IO操作,不会影响GIL? - Fernando Jorge Mota
1
计算需要花费2-5分钟的时间,相比之下仅需1分钟进行IO操作。如果机器有多个核心,可以通过将工作分配给各核心来加速计算。 - unutbu
谢谢您的回复。这非常有帮助。我正在处理NumPy数组。 - Joel Vroom

3
您可以使用类似 Queue.Queue(模块在这里:Queue)和 threading.Thread(或者如果只需要一个函数可使用 threading.start_new_thread),该模块在这里:threading。因为文件写入不是 CPU 密集型的,而是更多的 IO 操作(GIL 对其没有影响)。

2

简单的方法是只使用线程和队列。另一方面,如果计算部分不依赖于全局状态,并且您拥有具有多个CPU核心的机器,则更有效的方法是使用进程池

from multiprocessing import Pool

def compute_data(x):
    return some_calculation_with(x)

if __name__ == '__main__':
    pool = Pool(processes=4) # let's say you have quad-core, so start 4 workers

    with open("output_file","w") as outfile:
        for calculation_result in pool.imap(compute_data, range(200)):
        # pool.imap returns results as they come from process pool    
            outfile.write(calculation_result)  

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接