使用for循环的多进程池

11

我有一个文件列表,将其传递到for循环中并执行了一系列函数。最简单的并行化方法是什么?不确定是否能在任何地方找到这个确切的东西,而且我认为我的当前实现是错误的,因为我只看到一个文件被运行。通过阅读一些文章,我认为这应该是一个完美的并行化案例。

旧代码类似于:

import pandas as pd
filenames = ['file1.csv', 'file2.csv', 'file3.csv', 'file4.csv']
for file in filenames:
    file1 = pd.read_csv(file)
    print('running ' + str(file))
    a = function1(file1)
    b = function2(a)
    c = function3(b)
    for d in range(1,6):
            e = function4(c, d)
    c.to_csv('output.csv')

(错误地)并行化的代码

import pandas as pd
from multiprocessing import Pool
filenames = ['file1.csv', 'file2.csv', 'file3.csv', 'file4.csv']
def multip(filenames):
    file1 = pd.read_csv(file)
    print('running ' + str(file))
    a = function1(file1)
    b = function2(a)
    c = function3(b)
    for d in range(1,6):
            e = function4(c, d)
    c.to_csv('output.csv')

if __name__ == '__main__'
    pool = Pool(processes=4)
    runstuff = pool.map(multip(filenames))

我认为我想要做的是让每个核心(也许每个进程?)计算一个文件。我也这样做了。

multiprocessing.cpu_count()

我得到了8个(因为我的处理器有四个内核,所以这可能考虑了线程)。由于我总共大约有10个文件,如果我能每个进程处理一个文件,那将加快速度!同时,我希望在第一轮进程完成后,剩下的2个文件也能找到进程。

编辑: 为了更清楚,这些函数(如function1、function2等)还会在它们各自的文件中提供给其他函数(如function1a、function1b)。我使用import语句调用function1。

我得到了以下错误:

OSError: Expected file path name or file-like object, got <class 'list'> type

显然它不喜欢被传递一个列表,但我不想在if语句中只运行一个文件名[0],

1个回答

阿里云服务器只需要99元/年,新老用户同享,点击查看详情
14
import multiprocessing
names = ['file1.csv', 'file2.csv']
def multip(name):
     [do stuff here]

if __name__ == '__main__':
    #use one less process to be a little more stable
    p = multiprocessing.Pool(processes = multiprocessing.cpu_count()-1)
    #timing it...
    start = time.time()
    for file in names:
    p.apply_async(multip, [file])

    p.close()
    p.join()
    print("Complete")
    end = time.time()
    print('total time (s)= ' + str(end-start))

编辑:将if __name__ == '__main__'替换为以下内容。 这将运行所有文件:

修改后代码:

if __name__ == '__main__':

    p = Pool(processes = len(names))
    start = time.time()
    async_result = p.map_async(multip, names)
    p.close()
    p.join()
    print("Complete")
    end = time.time()
    print('total time (s)= ' + str(end-start))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,