为什么我不能在类方法中使用Python模块concurrent.futures?

12

我希望我的类方法可以并行运行,但是它只产生了一些错误,我无法解决。

我的代码如下:

import concurrent.futures as futures

samples = ['asfd', 'zxcv', 'asf', 'qwer']

class test:
    def __init__(self, samples):
        maturedb = {}
        with futures.ProcessPoolExecutor() as exe:
            for samplename, dResult in exe.map(self.make_readdb, samples):
                maturedb[samplename] = dResult
        print(maturedb)

    def make_readdb(self, samplename):
        return samplename, 1

test(samples)

如果我在Ubuntu机器上运行此代码,则会出现以下错误:

Traceback (most recent call last):
    File "/usr/lib/python3.2/multiprocessing/queues.py", line 272, in _feedsend(obj)
    _pickle.PicklingError: Can't pickle <class 'method'>: attribute lookup builtins.method failed

make_readdb 方法只是为了举例而简化的,但在实际代码中它是一个瓶颈,我需要将其并行化。


2
你的代码可以直接在Python 3.3上运行。 - jfs
我在Python 3.5.2上测试了我的代码,它运行良好。非常感谢你们的回答。 - user2028191
2个回答

4

根据文档

ProcessPoolExecutor类是一个Executor子类,它使用进程池异步执行调用。ProcessPoolExecutor使用multiprocessing模块,这使它可以避开全局解释器锁,但也意味着只能执行和返回可pickle化的对象

尝试使用ThreadPoolExecutor

我再次检查了您的代码,问题在于函数make_readdb是类test的成员。您能否重构并将此函数拆分出来?


3
多线程并没有起到任何帮助作用。由于cPython一次只运行一个线程,ThreadPoolExecutor使其变得更慢了。在这种情况下,单线程设计花费了约600秒,而ThreadPoolExecutor则要花费约1300秒。 - user2028191
2
你的工作是CPU绑定还是IO绑定?你在max_workers中使用了什么值?在调用ProcessPoolExecutor时,这将默认为您计算机中的处理器数量。 - Owen

1

self应该作为一个显式参数传递,即使在多个进程中也是如此。 像这样:

class test:
    def __init__(self, samples):
        maturedb = {}
        with futures.ProcessPoolExecutor() as exe:
            for samplename, dResult in exe.map(test.make_readdb,self, samples):
                maturedb[samplename] = dResult
        print(maturedb)

    def make_readdb(self, samplename):
        return samplename, 1

但只有一个进程会真正运行。 因此,这可能是更好的编写方式: 不要在类中将自身传递给ProcessPoolExecutor

class test:
    def __init__(self, samples):
        maturedb = {}
        with futures.ProcessPoolExecutor() as exe:
            for samplename, dResult in exe.map(test.make_readdb, samples):
                maturedb[samplename] = dResult
        print(maturedb)

    @staticmethod
    def make_readdb(samplename):
        return samplename, 1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接