多进程池 - PicklingError:无法pickle <type 'thread.lock'>:属性查找thread.lock失败

51

multiprocessing.Pool让我疯了...
我想要升级很多软件包,对于每个软件包,我都需要检查是否有更高版本。这可以通过check_one函数来完成。
主要代码在Updater.update方法中:我在那里创建Pool对象并调用map()方法。

以下是代码:

def check_one(args):
    res, total, package, version = args
    i = res.qsize()
    logger.info('\r[{0:.1%} - {1}, {2} / {3}]',
        i / float(total), package, i, total, addn=False)
    try:
        json = PyPIJson(package).retrieve()
        new_version = Version(json['info']['version'])
    except Exception as e:
        logger.error('Error: Failed to fetch data for {0} ({1})', package, e)
        return
    if new_version > version:
        res.put_nowait((package, version, new_version, json))

class Updater(FileManager):

    # __init__ and other methods...

    def update(self):    
        logger.info('Searching for updates')
        packages = Queue.Queue()
        data = ((packages, self.set_len, dist.project_name, Version(dist.version)) \
            for dist in self.working_set)
        pool = multiprocessing.Pool()
        pool.map(check_one, data)
        pool.close()
        pool.join()
        while True:
            try:
                package, version, new_version, json = packages.get_nowait()
            except Queue.Empty:
                break
            txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(package,
                                                                                      new_version,
                                                                                      version)
            u = logger.ask(txt, bool=('upgrade version', 'keep working version'), dont_ask=self.yes)
            if u:
                self.upgrade(package, json, new_version)
            else:
                logger.info('{0} has not been upgraded', package)
        self._clean()
        logger.success('Updating finished successfully')

当我运行它时,出现了这个奇怪的错误:
Searching for updates
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python2.7/dist-packages/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed
3个回答

33

multiprocessing通过mp.SimpleQueue将任务(包括check_onedata)传递给工作进程。与Queue.Queue不同,所有放入mp.SimpleQueue中的内容都必须是可序列化的。 Queue.Queue不可序列化:

import multiprocessing as mp
import Queue

def foo(queue):
    pass

pool=mp.Pool()
q=Queue.Queue()

pool.map(foo,(q,))

产生了以下异常:

UnpickleableError: Cannot pickle <type 'thread.lock'> objects

您的data包含一个packages,它是一个Queue.Queue。那可能是问题的根源。


下面是可能的解决方法:该Queue用于两个目的:

  1. 通过调用qsize来查找近似大小。
  2. 存储结果以供稍后检索。

为了在多个进程之间共享值,我们可以使用mp.Value代替调用qsize

不需要再将结果存储在队列中,只需从check_one调用返回值即可(应该这样做)。pool.map会将结果收集到自己创建的队列中,并将结果作为pool.map的返回值返回。

例如:

import multiprocessing as mp
import Queue
import random
import logging

# logger=mp.log_to_stderr(logging.DEBUG)
logger = logging.getLogger(__name__)


qsize = mp.Value('i', 1)
def check_one(args):
    total, package, version = args
    i = qsize.value
    logger.info('\r[{0:.1%} - {1}, {2} / {3}]'.format(
        i / float(total), package, i, total))
    new_version = random.randrange(0,100)
    qsize.value += 1
    if new_version > version:
        return (package, version, new_version, None)
    else:
        return None

def update():    
    logger.info('Searching for updates')
    set_len=10
    data = ( (set_len, 'project-{0}'.format(i), random.randrange(0,100))
             for i in range(set_len) )
    pool = mp.Pool()
    results = pool.map(check_one, data)
    pool.close()
    pool.join()
    for result in results:
        if result is None: continue
        package, version, new_version, json = result
        txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(
            package, new_version, version)
        logger.info(txt)
    logger.info('Updating finished successfully')

if __name__=='__main__':
    logging.basicConfig(level=logging.DEBUG)
    update()

谢谢!但是现在我该如何填充我的队列呢?我不能把它变成全局的……我也试着把check_one变成一个方法(使用这个方法:https://dev59.com/FXI-5IYBdhLWcg3whIqk),但是还是不行…… - rubik
6
我在这里找到了解决方案:https://dev59.com/f3A75IYBdhLWcg3wlqET。我必须使用`multiprocessing.Manager().Queue()`而不是`multiprocessing.Queue`。 - rubik
关于解决方法,我认为你是对的。我会修复我的代码。再次感谢你! - rubik
1
我知道这个问题涉及到multiprocessing.Pool,但如果我有一个实际的threading.Lock,我该怎么办? - Jeppe

11

在类似问题上进行了大量挖掘后...

事实证明,任何包含threading.Condition()对象的对象都永远不会与multiprocessing.Pool一起使用。

这里有一个例子

import multiprocessing as mp
import threading

class MyClass(object):
   def __init__(self):
      self.cond = threading.Condition()

def foo(mc):
   pass

pool=mp.Pool()
mc=MyClass()
pool.map(foo,(mc,))

我用Python 2.7.5运行了这个程序,遇到了同样的错误:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 811, in __bootstrap_inner
self.run()
  File "/usr/lib64/python2.7/threading.py", line 764, in run
self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed

但是在Python 3.4.1上运行它,这个问题已经被解决了。

虽然我还没有找到任何有用的解决方法,适用于我们仍在使用2.7.x版本的人。


1
截至Python 3.4.3,threading.Lock仍无法被pickle(尽管如您所提到的,threading.Condition可以)。 - caleb
14
我确认在Python 3.6.2中,threading.Lock 对象仍然不能被序列化。 - astrojuanlu
17
嗯...仍然是3.7。 - jersey bean
1
版本3.6.8,目前遇到这个问题,正在查找Stack Overflow... - RonanFelipe
当我尝试将 threading.RLock 传递给 Pool 初始化程序时,我遇到了相同的问题。在我切换到 multiprocessing.RLock 后,一切都按预期工作了。 - Frank Liu
@caleb @astrojuanlu 确认@Frank的解决方案,使用multiprocessing.Lock代替threading.Lock是可序列化的,并且在池中正常工作。 - Chris Collett

1

我在Docker上使用Python 3.6时遇到了这个问题,将版本更改为3.7.3后问题得到解决。


在Python 3.7.6中遇到了问题。 - datamansahil
Python 3.8仍在努力奋斗。 - Frankie Drake
Python 3.9.13 对我来说。 - kev
对我来说,仍然存在问题。 - Haiping Fan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接