使用可迭代对象和多个参数的Python多进程处理

3
使用多进程,我想传递一个可迭代对象和多个参数: a) 运行在n_core CPU上的函数 b) 每次产生(或返回)n_core个结果 c) 以任何完成顺序
from multiprocessing import Pool 

def func(iterable, args):
    this, that, other = args[0], args[1], args[2]

    for s in iterable:
        return ' '.join([s, this, that, other])        

def main():
    iterable = ['abc', 'bcd', 'cde', 'def', 'efg', 'fgh', 'ghi', 'hij']
    args = ['this', 'that', 'other']
    n_core = 2

    p = Pool(n_core)
    for r in p.imap_unordered(func, iterable, args):
        print(r)

if __name__ == '__main__':
    main()

预期结果为:
"abc this that other"
"bcd this that other"
"cde this that other"
"def this that other" 
"efg this that other" 
"fgh this that other"
"ghi this that other" 
"hij this that other"

如何正确实现这个功能?

其次,对于这个问题,concurrent.futures.ProcessPoolExecutor是否是更好的选择?


你正在尝试为可迭代对象中的每个项调用一次func。func 应该期望一个单独的项作为输入,而不是整个可迭代对象。 - zeehio
2个回答

6
您可以创建一个new_iterable,将iterable中的值与args组合起来:
from multiprocessing import Pool

def func(args):
    iterable, this, that, other = args[0], args[1][0], args[1][1], args[1][2]
    return ' '.join([iterable, this, that, other])

def main():
    iterable = ['abc', 'bcd', 'cde', 'def', 'efg', 'fgh', 'ghi', 'hij']
    args = ['this', 'that', 'other']
    new_iterable = ([x, args] for x in iterable)
    n_core = 2

    p = Pool(n_core)
    for r in p.imap_unordered(func, new_iterable):
        print(r)

if __name__ == '__main__':
    main()

输出

abc this that other
bcd this that other
cde this that other
def this that other
efg this that other
fgh this that other
ghi this that other
hij this that other

这个解决方案使用生成器表达式创建一个新的可迭代对象,将iterable中的条目与所需的args组合起来。您也可以使用生成器函数来实现相同的功能。 更新: 我修改了func以生成您在评论中提到并添加到问题中的预期结果。

谢谢,这很酷。但是结果应该是:“abc这个那个其他的”,“bcd这个那个其他的”,“cde这个那个其他的”,以此类推。 - Henry Thornton
啊,应该删除“for s in iterable”,只留下“return ' '.join([iterable, this, that, other])”。现在它可以工作了。我会等待其他回复,然后接受。再次感谢。 - Henry Thornton
@HenryThornton:我更新了答案以生成预期的结果。 - user3657941

2

这个问题中的代码似乎有误。函数 func 应该只接受一个参数,而不是整个可迭代对象。

应该改为:

def func(iterable, args):
    this, that, other = args[0], args[1], args[2]

    for s in iterable:
        return ' '.join([s, this, that, other])        

您可以使用以下方法:

def func(item, args):
    this, that, other = args[0], args[1], args[2]
    return ' '.join([item, this, that, other])        

除了这个错误之外,imap_unordered 不接受多于一个参数。
以下代码可以实现你所期望的功能:
try:
    from itertools import izip
except ImportError:  # Python 3 built-in zip already returns iterable
    izip = zip

from itertools import repeat
from multiprocessing import Pool

def func_star_single(func_item_args):
    """Equivalent to:
       func = func_item_args[0]
       item = func_item_args[1]
       args = func_item_args[2:]
       return func(item,args[0],args[1],...)
    """
    return func_item_args[0](*[func_item_args[1]] + func_item_args[2])


def func(item, args):
    this, that, other = args[0], args[1], args[2]
    return ' '.join([item, this, that, other])    


def main():
    iterable = ['abc', 'bcd', 'cde', 'def', 'efg', 'fgh', 'ghi', 'hij']
    args = ['this', 'that', 'other']
    n_core = 2

    p = Pool(n_core)
    for r in p.imap_unordered(func_star_single, izip(repeat(func), iterable, repeat(list([args])))):
        print(r)

if __name__ == '__main__':
    main()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接