Python多进程锁

36

这个多进程代码按预期工作。它创建了4个Python进程,并使用它们来打印数字0到39,每次打印后有延迟。

import multiprocessing
import time

def job(num):
  print num
  time.sleep(1)

pool = multiprocessing.Pool(4)

lst = range(40)
for i in lst:
  pool.apply_async(job, [i])

pool.close()
pool.join()

然而,当我尝试使用multiprocessing.Lock来防止多个进程同时打印到标准输出时,程序立即退出且没有任何输出。

import multiprocessing
import time

def job(lock, num):
  lock.acquire()
  print num
  lock.release()
  time.sleep(1)

pool = multiprocessing.Pool(4)
l = multiprocessing.Lock()

lst = range(40)
for i in lst:
  pool.apply_async(job, [l, i])

pool.close()
pool.join()
为什么引入多进程锁(multiprocessing.Lock)会导致这段代码无法工作?
更新:当锁全局声明时(我进行了一些非确定性测试以检查锁是否正常工作),它可以正常工作,而不是像上面的代码那样将锁作为参数传递(Python的多进程文档显示锁应该被作为参数传递)。下面的代码中锁是全局声明的,而不是像上面的代码中作为参数传递。
import multiprocessing
import time

l = multiprocessing.Lock()

def job(num):
  l.acquire()
  print num
  l.release()
  time.sleep(1)

pool = multiprocessing.Pool(4)

lst = range(40)
for i in lst:
  pool.apply_async(job, [i])

pool.close()
pool.join()

2
只有在锁被全局声明时才会起作用。锁必须在池实例化之前创建。否则,工作进程会在锁创建之前分叉,并且无法共享它。 - Oren
4个回答

36

如果您将pool.apply_async更改为pool.apply,则会出现此异常:

Traceback (most recent call last):
  File "p.py", line 15, in <module>
    pool.apply(job, [l, i])
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 244, in apply
    return self.apply_async(func, args, kwds).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
RuntimeError: Lock objects should only be shared between processes through inheritance

pool.apply_async 只是将其隐藏起来。我很不想这么说,但在你的例子中使用全局变量可能是最简单的方法。希望 迅猛龙 不会抓到你。


谢谢!使用apply而不是apply_async似乎是调试这些问题的有用方法。 - dannyadam
是的,apply_async甚至不打印警告信息,这似乎有点愚蠢。 - matsjoyce
1
同意,但是可以在Python 3中使用apply_async的error_callback作为解决方法。解决Python 2的方法 -> https://dev59.com/E4bca4cB1Zd3GeqPcvlx - TitanFighter

16

其他答案已经提供了答案,即除非提供适当的error_callback参数,否则apply_async会默默失败。我仍然认为OP提到的另一点是有效的——官方文档确实展示了将multiprocessing.Lock作为函数参数传递的情况。事实上,在Programming guidelines的“显式地向子进程传递资源”这个小节中,推荐将multiprocessing.Lock对象作为函数参数传递,而不是全局变量。我写了很多代码,其中我将multiprocessing.Lock作为一个参数传递给子进程,一切都按预期工作。

那么问题出在哪里呢?

我首先调查了multiprocessing.Lock是否可被pickle。在Python 3、MacOS+CPython中,尝试pickle multiprocessing.Lock会产生其他人遇到的熟悉的RuntimeError错误提示。

>>> pickle.dumps(multiprocessing.Lock())
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-7-66dfe1355652> in <module>
----> 1 pickle.dumps(multiprocessing.Lock())

/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/synchronize.py in __getstate__(self)
     99
    100     def __getstate__(self):
--> 101         context.assert_spawning(self)
    102         sl = self._semlock
    103         if sys.platform == 'win32':

/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py in assert_spawning(obj)
    354         raise RuntimeError(
    355             '%s objects should only be shared between processes'
--> 356             ' through inheritance' % type(obj).__name__
    357             )

RuntimeError: Lock objects should only be shared between processes through inheritance
对我而言,这证实了确实不可被pickle化。

旁白开始

但是,同一个锁仍需要在两个或多个Python进程之间共享,这些进程将具有自己的潜在不同的地址空间(例如,在使用"spawn"或"forkserver"作为启动方法时)。必须要做一些特殊的事情才能发送Lock到跨进程。这其他StackOverflow帖子似乎表明,在Unix系统中,可能是通过由操作系统本身(在python外部)支持的命名信号量来实现的。然后,两个或多个Python进程可以链接到在一个位置有效地驻留在两个Python进程之外的同一个锁。也可能会有共享内存的实现。

旁白结束

我们是否可以将对象作为参数传递?

经过更多实验和阅读后,看起来区别在于和之间。 允许您将作为参数传递,但则不允许。以下是一个有效的示例:

import multiprocessing
import time
from multiprocessing import Process, Lock


def task(n: int, lock):
    with lock:
        print(f'n={n}')
    time.sleep(0.25)


if __name__ == '__main__':
    multiprocessing.set_start_method('forkserver')
    lock = Lock()
    processes = [Process(target=task, args=(i, lock)) for i in range(20)]
    for process in processes:
        process.start()
    for process in processes:
        process.join()
请注意,在“安全导入主模块”子部分中提到的使用__name__ == '__main__'是必要的,详见编程指南multiprocessing.Pool 似乎使用queue.SimpleQueue将每个任务放入队列中,这就是发生 pickling 的地方。很可能,multiprocessing.Process 没有使用 pickling(或者在使用某个特殊版本的 pickling)。

9

我认为原因是多进程池使用pickle在进程间传输对象。但是,Lock无法被序列化:

>>> import multiprocessing
>>> import pickle
>>> lock = multiprocessing.Lock()
>>> lp = pickle.dumps(lock)
Traceback (most recent call last):
  File "<pyshell#3>", line 1, in <module>
    lp = pickle.dumps(lock)
...
RuntimeError: Lock objects should only be shared between processes through inheritance
>>> 

请参阅https://docs.python.org/2/library/multiprocessing.html#all-platforms中的“可拾取性”和“继承优于pickle / unpickle”部分。

2

该stackoverflow帖子所述,Manager.Lock()可能适用于此处。它可以传递给池,因为它可以被pickle。

import multiprocessing
import time

def job(lock, num):
  lock.acquire()
  print num
  lock.release()
  time.sleep(1)

pool = multiprocessing.Pool(4)
m = multiprocessing.Manager()
l = m.Lock()

lst = range(40)
for i in lst:
  pool.apply_async(job, [l, i])

pool.close()
pool.join()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接