使用Pathos进行Python多进程处理

20

我希望使用Python的pathos将计算分配到不同的进程中,以便在多核处理器上加速它。我的代码组织如下:

class:
   def foo(self,name):
    ...
    setattr(self,name,something)
    ...
   def boo(self):
      for name in list:
         self.foo(name)

由于使用multiprocessing.Pool时遇到了一些问题,我决定尝试使用pathos。 我按照之前的帖子中提到的建议进行了尝试:

import pathos.multiprocessing

但是结果出现错误:找不到最新版的pathos中的模块multiprocessing。

然后我尝试修改方法“boo”:

def boo(self):
 import pathos
 pathos.pp_map.pp_map(self.foo,list)

现在没有错误抛出,但是foo不起作用,我的类实例没有新属性。请帮帮我,因为我已经花了一整天的时间,但是还是不知道接下来该怎么办。

2个回答

43

我是 pathos 的作者。从你上面的代码中,我不确定你想做什么。 但是,我可以帮你澄清一些问题。下面是一些类似的代码:

>>> from pathos.multiprocessing import ProcessingPool
>>> class Bar:
...   def foo(self, name):
...     return len(str(name))
...   def boo(self, things):
...     for thing in things:
...       self.sum += self.foo(thing)
...     return self.sum
...   sum = 0
... 
>>> b = Bar()
>>> results = ProcessingPool().map(b.boo, [[12,3,456],[8,9,10],['a','b','cde']])
>>> results
[6, 4, 5]
>>> b.sum
0

以上发生的是,调用了 Bar 实例 bboo 方法,将 b.boo 传递给一个新的 Python 进程,并对每个嵌套列表进行求值。您可以看到结果是正确的... len("12")+len("3")+len("456") 是6,以此类推。

然而,当你查看b.sum时,你也能看到它仍然是神秘的0。为什么b.sum仍然是零?那么,multiprocessing(因此也包括pathos.multiprocessing)所做的是制作一个复制品,并将其通过map传递到其他Python进程中...然后调用复制的实例(并行地)并返回被调用方法调用的任何结果。请注意,您必须返回结果、打印、记录或将它们发送到文件或其他地方。它们不能像您期望的那样回到原始实例,因为发送到其他处理器的不是原始实例,而是该实例的副本,每个副本都有其自己的sum属性增加,但原始的`b.sum'没有改变。

然而,在pathos内部有计划使上述内容按您期望的方式工作——即原始对象被更新,但目前还不能这样工作。

编辑:如果您正在使用pip安装,请注意,发布的最新版本pathos已经有好几年了,可能无法正确安装或者无法安装所有子模块。新的pathos版本正在等待发布,但在此之前,最好从github获取最新版本的代码,然后从那里安装。总体上来说,主干在开发中是相当稳定的。我认为您的问题可能是由于安装时“新”pip与“旧”pathos不兼容而导致未安装所有软件包。如果缺少pathos.multiprocessing,则最有可能是罪魁祸首。

在此处从github获取pathoshttps://github.com/uqfoundation/pathos


我在这里和 OP 有同样的问题。我可以执行 import pathos,但是 import pathos.multiprocessing 给我一个模块未找到的错误。可能的原因是什么? - sashkello
问题是,由于英语和最小代码示例的某些障碍,我不理解OP的问题。也许我可以尝试另一种方法。也许并没有安装所有依赖项。你能导入processing吗?从processing.pool导入池呢?从pathos.helpers导入mp_helper或ProcessPool呢?pp导入和从pathos.helpers导入pp_helper呢? - Mike McKerns
在这个包中,我有核心、主机、启动器、启动器SCP、启动器SSH、pp_map、服务器、隧道、工具、XMLRPC请求处理程序和XMLRPC服务器。就这些,没有助手,也没有多进程。 - sashkello
是的,现在它可以工作了。由于某些原因,官方的 tgz 文件缺少一些子模块。从 git 安装对我很有帮助。 - sashkello
2
@Brideau:我正在将pathos分解成更多的包(基本上是所有非标准依赖项),以确保一切都可以通过pip进行安装。新版本即将推出。 - Mike McKerns
显示剩余6条评论

0
这是我的做法 - 我将要并行运行的函数放在类外,并在调用pool.map时将对象作为参数传递。然后,我返回要重新分配的对象。
from pathos.multiprocessing import ProcessingPool


def boo(args):
    b, things = args
    for thing in things:
        b.sum += b.foo(thing)
    return [b, b.sum]

class Bar:
    def __init__(self):
       self.sum = 0
    def foo(self, name):
       return len(str(name))

pool = ProcessingPool(2)
b1 = Bar()
b2 = Bar()
print(b1, b2)

results = pool.map(boo, [[b1, [12,3,456]],[b2, ['a','b','cde']]])

b1, b1s = results[0]
b2, b2s = results[1]
print(b1,b1s,b1.sum)
print(b2, b2s, b2.sum)

输出:

(<__main__.Bar instance at 0x10b341518>, <__main__.Bar instance at 0x10b341560>)
(<__main__.Bar instance at 0x10b3504d0>, 6, 6)
(<__main__.Bar instance at 0x10b350560>, 5, 5)

请注意,在调用map之前,b1和b2不再与先前的相同,因为复制了它们以便传递,正如@Mike McKerns所描述的那样。 然而,由于它们被传递、返回并重新分配,它们所有属性的值都是完好无损的。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接