使用Python的多进程池和map_async函数

10

我试图在Python中使用多进程包multiprocessing和Pool。

我有一个被map_async函数调用的函数f:

from multiprocessing import Pool

def f(host, x):
    print host
    print x

hosts = ['1.1.1.1', '2.2.2.2']
pool = Pool(processes=5)
pool.map_async(f,hosts,"test")
pool.close()
pool.join()

这段代码出现了以下错误:

Traceback (most recent call last):
  File "pool-test.py", line 9, in <module>
    pool.map_async(f,hosts,"test")
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 290, in map_async
    result = MapResult(self._cache, chunksize, len(iterable), callback)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 557, in __init__
    self._number_left = length//chunksize + bool(length % chunksize)
TypeError: unsupported operand type(s) for //: 'int' and 'str'

我不知道如何向函数f传递超过1个参数,有什么方法吗?


你可以直接使用 pool.map,并且完全删除 "test" 虚拟变量。 - danodonovan
3个回答

13

"test" 被解释为 map_asyncchunksize 关键字参数 (参见文档).

你的代码应该是这样的(从我的IPython会话中复制粘贴):

from multiprocessing import Pool

def f(arg):
    host, x = arg
    print host
    print x

hosts = ['1.1.1.1', '2.2.2.2']
args = ((host, "test") for host in hosts)
pool = Pool(processes=5)
pool.map_async(f, args)
pool.close()
pool.join()
## -- End pasted text --

1.1.1.1
test
2.2.2.2
test

注意:在Python 3中,您可以使用starmap,它将从元组中解包参数。您将能够避免显式执行host,x = arg


我测试了一下,但结果不好;它打印出了两个主机,但只有“test”单词中的“t”和“e”。 - dseira
使用x=["test","test"]可以工作,但是这没有意义,因为想象一下主机列表大约有10000个,我只想要一个x来比较结果。拥有一个包含相同结果的10000个条目的x列表是不可行的。无论如何,谢谢。 - dseira
它不会将它们保存在内存中,而是动态创建它们,就像常规的for循环一样。在我看来,你不会有任何明显的惩罚。你想用那个列表实现什么? - F.X.
如果每个主机的参数都是相同的,那么您可以使用一个通用函数,不是吗? - F.X.
1
最后我使用了一个全局选项。它确实是一个静态变量。 - dseira
显示剩余4条评论

6

在Python 3中,池返回一个上下文管理器,因此可以使用with语句。这避免了异常问题,并且不需要关闭和连接。在这种情况下,函数始终接收变量x的常量值,因此可以使用部分求值来处理。map_async是惰性的,所以我们需要获取结果才能执行操作,最好直接使用map。因此:

from multiprocessing import Pool
from functools import partial

def f(host, x):
    print(host)
    print(x)

hosts = ('1.1.1.1', '2.2.2.2')
with Pool(processes=5) as pool:
    pool.map(partial(f, x='test'), hosts)

结果为:

1.1.1.1
测试
2.2.2.2
测试

1
据我回忆,Pool().map()和.map_async()只接受单个参数。可以通过传递列表来解决此限制,但当然需要设计一个专门接受列表对象的定制函数。
一种方法是编写自定义代码 - 即通用的“函数+参数”包装器。我想出了类似于这样的东西(注意:仅部分测试):
def tmp_test():
    # a short test script:
    #
    A=[[1,2], [2,3], [4,5], [6,7]]
    P=mpp.Pool(mpp.cpu_count())
    X=P.map_async(map_helper, [[operator.eq]+a for a in A])
    #
    return X.get()


def null_funct(args=[], kwargs={}):
    # a place-holder 
    pass
#
def map_helper(args_in = [null_funct, [], {}]):
    # helper function for pool.map_async(). pass data as a list(-like object):
    # [function, [args], {kwargs}] (though we'll allow for some mistakes).
    #
    funct = args_in[0]
    #
    # allow for different formatting options:
    if not (isinstance(args_in[1], list) or isinstance(args_in[1], tuple) or isinstance(args_in[1], dict)):
        # probably passed a list of parameters. just use them:
        args = args_in[1:]
        #
        return funct(*args)
    #
    # if the args are "properly" formatted:
    args=[]
    kwargs = {}
    for arg in args_in[1:]:
        # assign list types to args, dict types to kwargs...
        if isinstance(arg, list) or isinstance(arg, tuple): args += arg
        if isinstance(arg, dict): kwargs.update(arg)
    return funct(*args, **kwargs)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接