Python多进程池 - 遍历对象方法?

7

也许更熟悉Python Multiprocessing Pool代码的人可以帮助我。我正在尝试通过套接字连接同时连接我的网络上的几个主机(每次N个),并执行一些RPC操作。当一个主机完成时,我想将下一个主机添加到池中运行,直到所有主机都完成。

我有一个名为HClass的类,其中包含一些方法来实现这一点,并且主机名列表包含在hostlist中。但是,我未能理解docs.python.org关于Pool的任何示例以使其工作。

以下是我目前拥有的简短代码片段:

hostlist = [h1, h2, h3, h4, ....]
poolsize = 2

class HClass:
  def __init__(self, hostname="default"):
    self.hostname = hostname

  def go(self):
      # do stuff
      # do more stuff
  ....

if __name__ == "__main__":
  objs = [HClass(hostname=current_host) for current_host in hostlist]
  pool = multiprocessing.pool(poolsize)
  results = pool.apply_async(objs.go())

到目前为止,我遇到了这个回溯(traceback):

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'generator'>: attribute lookup __builtin__.generator failed

这个过程会一直挂起,直到我使用Control-C强制退出。

3个回答

7

我建议尽量减少进程间通信的次数。看起来你只需要发送主机名字符串:

for host in hostlist:
    pool.apply_async(worker, args = (host,), callback = on_return)

例如,
import multiprocessing as mp
import time
import logging

logger = mp.log_to_stderr(logging.INFO)

hostlist = ['h1', 'h2', 'h3', 'h4']*3
poolsize = 2

class HClass:
    def __init__(self, hostname="default"):
        self.hostname = hostname

    def go(self):
        logger.info('processing {h}'.format(h = self.hostname))
        time.sleep(1)
        return self.hostname

def worker(host):
    h = HClass(hostname = host)
    return h.go()

result = []
def on_return(retval):
    result.append(retval)

if __name__ == "__main__":
    pool = mp.Pool(poolsize)
    for host in hostlist:
        pool.apply_async(worker, args = (host,), callback = on_return)
    pool.close()
    pool.join()
    logger.info(result)

unutbu,你说得完全正确。我想我试图跳过一些步骤/让事情更简单(至少在我的头脑中),没有将那个go()函数(你的worker)放在类外面。谢谢! - jfofo

1
我同意@unutbu的解决方案...更简单更好。 但是如果你必须发送类方法go,我会使用pathos.multiprocessing而不是multiprocessing
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

在这里获取代码: https://github.com/uqfoundation/pathos


1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接