Python中类内的池化

12

我想在一个类中使用池(Pool),但是出了问题。我的代码比较长,我创建了一个小型的演示版本来说明问题。如果您能为我提供下面代码的可工作变体,那就太好了。

from multiprocessing import Pool

class SeriesInstance(object):
    def __init__(self):
        self.numbers = [1,2,3]
    def F(self, x):
        return x * x
    def run(self):
        p = Pool()
        print p.map(self.F, self.numbers)


ins = SeriesInstance()
ins.run()

输出:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

然后就停止响应了。

4个回答

16

看起来由于函数被传递给工作线程的方式(通过 pickling),不幸的是您不能使用实例方法。我的第一个想法是使用lambda函数,但事实证明内置的pickler也无法序列化它们。解决方案遗憾地是只能在全局命名空间中使用函数。如其他答案所建议的那样,您可以使用静态方法并传递self使其看起来更像是实例方法。

from multiprocessing import Pool
from itertools import repeat

class SeriesInstance(object):
    def __init__(self):
        self.numbers = [1,2,3]

    def run(self):
        p = Pool()
        squares = p.map(self.F, self.numbers)
        multiples = p.starmap(self.G, zip(repeat(self), [2, 5, 10]))
        return (squares, multiples)

    @staticmethod
    def F(x):
        return x * x

    @staticmethod
    def G(self, m):
        return [m *n for n in self.numbers]

if __name__ == '__main__':
    print(SeriesInstance().run())

谢谢,但似乎出现了问题。当我在我的大型代码中使用这个原则时,经过几次迭代后它会崩溃,并显示以下错误:“OSError: [Errno 35] 资源暂时不可用”。 - user58925
1
我认为错误是由于创建太多进程时的操作系统错误导致的。不过这只是我的猜测,看起来你需要在使用完连接池后正确关闭它们。根据你实际的代码情况,如果可以的话,请提供代码,你可以使用一个传递为参数的单一连接池或者对于每个SeriesInstance,在使用完连接池后必须关闭它们。 - Alex Sherman
通常情况下,您应该通过“p = mp.Pool(mp.cpu_count())”来限制池的大小。这个方法非常完美。 - Steve Lihn
如果函数F包含类成员,会怎么样? - Livne Rosenblum
需要注意的是,此示例中池未被正确清除。我们应该使用 with Pool() as p: 而不是 p = Pool()。如果我们打算调用 run() 多次,则应该将池的创建从方法中移出,以便可以重复使用,而不是每次都创建一个新的。 - Neil Traft

4

您还可以在类中使用静态函数来进行多进程处理。


我认为这是与问题最匹配的最简单的解决方案。 - namespace-Pt

2

您出现了错误,因为pickle无法序列化实例方法。因此,您应该使用这个小技巧:

from itertools import repeat
from multiprocessing import Pool


class SeriesInstance:
    def __init__(self):
        self.numbers = [1, 2, 3]

    def F(self, x):
        return x * x

    def run(self):
        p = Pool()
        print(list(p.starmap(SeriesInstance.F, zip(repeat(self), self.numbers))))


if __name__ == '__main__':
    SeriesInstance().run()


1
欢迎来到SO!您可以通过一些解释来提高您的答案质量。 - Timus
这个例子将使用所有核心来加速实例方法的计算。它有效运行。 - Steve Lihn
该方法的缺点是我们将序列化整个类。在真实的例子中,类中可能有其他不需要的数据成员,只会在序列化期间增加负担。例如,使用此方法,每个工作线程都会接收所有的“数字”,而不仅仅是它需要处理的一个数字。除此之外,我喜欢这个技巧的简单性。 - Neil Traft
需要注意的是,在这个例子中,池没有被正确地清理。 (在原始帖子中也没有)。 - Neil Traft

0

在stackoverflow上有很多帖子关于这个问题由于不同的原因而发生。在我的情况下,我试图从类的另一个函数中调用pool.starmap。将其变为staticmethod或者让类外的函数调用它都不起作用,并且会给出相同的错误。类实例无法被pickle化,因此我们需要在启动多进程后创建实例。

最终我做的是将我的类分成两个类。类似于这样:

from multiprocessing import Pool

class B:
    ...
    def process_feature(idx, feature):
        # do stuff in the new process
        pass
    ...

def multiprocess_feature(process_args):
    b_instance = B()
    return b_instance.process_feature(*process_args)

class A:
    ...
    def process_stuff():
        ...
        with Pool(processes=num_processes, maxtasksperchild=10) as pool:
            results = pool.starmap(
                multiprocess_feature,
                [
                    (idx, feature)
                    for idx, feature in enumerate(features)
                ],
                chunksize=100,
            )
        ...
    ...

...

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接