Python多进程apply_async只使用一个进程。

19

我有一个脚本,其中包括从列表中打开文件,然后对该文件中的文本执行某些操作。我正在使用Python multiprocessing和Pool尝试并行化此操作。该脚本的一个抽象如下:

import os
from multiprocessing import Pool

results = []
def testFunc(files):
    for file in files:
        print "Working in Process #%d" % (os.getpid())
        #This is just an illustration of some logic. This is not what I'm actually doing.
        for line in file:
            if 'dog' in line:
                results.append(line)

if __name__=="__main__":
    p = Pool(processes=2)
    files = ['/path/to/file1.txt', '/path/to/file2.txt']
    results = p.apply_async(testFunc, args = (files,))
    results2 = results.get()
当我运行这段代码时,每次迭代中进程ID的输出都相同。基本上我的目标是将输入列表的每个元素分叉到单独的进程中,但似乎一个进程在处理所有工作。
2个回答

32
  • apply_async把一个任务交由进程池处理。如果要利用更多的处理器,你需要多次调用apply_async
  • 不要让两个进程同时尝试向同一个列表results写入数据。因为进程池中的工作进程是独立的进程,所以它们不会向同一个列表中写入数据。解决这个问题的一种方法是使用输出队列(output Queue)。你可以自己设置输出队列,或使用apply_async的回调(callback)函数来为你设置输出队列。apply_async将在函数执行完成后调用回调函数。
  • 你也可以使用map_async代替apply_async,但这样你会得到一个嵌套列表,你还需要展开(flatten)它。

因此,建议你尝试以下方式:

import os
import multiprocessing as mp

results = []   

def testFunc(file):
    result = []
    print "Working in Process #%d" % (os.getpid())
    # This is just an illustration of some logic. This is not what I'm
    # actually doing.
    with open(file, 'r') as f:
        for line in f:
            if 'dog' in line:
                result.append(line)
    return result


def collect_results(result):
    results.extend(result)

if __name__ == "__main__":
    p = mp.Pool(processes=2)
    files = ['/path/to/file1.txt', '/path/to/file2.txt']
    for f in files:
        p.apply_async(testFunc, args=(f, ), callback=collect_results)
    p.close()
    p.join()
    print(results)

调用 p.join() 后没有任何反应。 - Soren

8
也许在这种情况下,您应该使用map_async:
import os
from multiprocessing import Pool

results = []
def testFunc(file):
    message =  ("Working in Process #%d" % (os.getpid()))
    #This is just an illustration of some logic. This is not what I'm actually doing.
    for line in file:
        if 'dog' in line:
            results.append(line)
    return message

if __name__=="__main__":
    print("saddsf")
    p = Pool(processes=2)
    files = ['/path/to/file1.txt', '/path/to/file2.txt']
    results = p.map_async(testFunc, files)
    print(results.get())

1
如果你打算立即使用results.get(),那么可能只需要使用map - mgilson
我感谢您的回答,但出于各种原因,我想坚持使用apply_async。 - user1074057

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接