使用Python的多进程时出现键盘中断问题

14

我遇到了一个问题,无法优雅地处理Python多进程中的键盘中断。

(是的,我知道Ctr-C不能保证优雅关闭——但让我们将这个讨论留到另一个主题中)

请考虑以下代码,我使用了 multiprocessing.Manager#list(),它是一个ListProxy,我了解到它可以处理对列表的多进程访问。

当我使用Ctr-C时,尝试访问ListProxy时会出现socket.error: [Errno 2] No such file or directory错误。

我希望在使用Ctr-C时共享列表不会被破坏。这可能吗?!

注意:我希望在不使用池和队列的情况下解决此问题。

from multiprocessing import Process, Manager
from time import sleep

def f(process_number, shared_array):
    try:
        print "starting thread: ", process_number
        shared_array.append(process_number)
        sleep(3)
        shared_array.append(process_number)
    except KeyboardInterrupt:
        print "Keyboard interrupt in process: ", process_number
    finally:
        print "cleaning up thread", process_number

if __name__ == '__main__':

    processes = []

    manager = Manager()
    shared_array = manager.list()

    for i in xrange(4):
        p = Process(target=f, args=(i, shared_array))
        p.start()
        processes.append(p)

    try:
        for process in processes:
            process.join()
    except KeyboardInterrupt:
        print "Keyboard interrupt in main"

    for item in shared_array:
        # raises "socket.error: [Errno 2] No such file or directory"
        print item

如果你运行它,然后按下Ctrl-C,我们会得到以下结果:

starting thread:  0
starting thread:  1
starting thread:  3
starting thread:  2
^CKeyboard interupt in process:  3
Keyboard interupt in process:  0
cleaning up thread 3
cleaning up thread 0
Keyboard interupt in process:  1
Keyboard interupt in process:  2
cleaning up thread 1
cleaning up thread 2
Keyboard interupt in main
Traceback (most recent call last):
  File "multi.py", line 33, in <module>
    for item in shared_array:
  File "<string>", line 2, in __getitem__
  File "/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 755, in _callmethod
    self._connect()
  File "/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 742, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 169, in Client
    c = SocketClient(address)
  File "/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 293, in SocketClient
    s.connect(address)
  File "/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
socket.error: [Errno 2] No such file or directory

这里是另一种方法,使用 multiprocessing.Lock 实现类似的效果 ... gist

2个回答

21

multiprocessing.Manager()启动一个子进程,负责处理共享列表代理。

运行时的netstat输出:

unix 2 [ ACC ] STREAM LISTENING 3921657 8457/python /tmp/pymp-B9dcij/listener-X423Ml

由multiprocessing.Manager()创建的这个子进程正在捕获您的SIGINT并退出,导致与其相关的任何内容被解除引用,因此出现了“no such file”错误(根据我决定发送SIGINT的时间,我还收到了几个其他错误)。

为解决此问题,您可以直接声明SyncManager对象(而不是让Manager()为您完成)。 这将要求您使用start()方法实际启动子进程。 start()方法将初始化函数作为其第一个参数(您可以在此处覆盖manager的SIGINT)。

以下是代码,请尝试:

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager, SyncManager
from time import sleep
import signal

#handle SIGINT from SyncManager object
def mgr_sig_handler(signal, frame):
    print 'not closing the mgr'

#initilizer for SyncManager
def mgr_init():
    signal.signal(signal.SIGINT, mgr_sig_handler)
    #signal.signal(signal.SIGINT, signal.SIG_IGN) # <- OR do this to just ignore the signal
    print 'initialized mananger'

def f(process_number, shared_array):
    try:
        print "starting thread: ", process_number
        shared_array.append(process_number)
        sleep(3)
        shared_array.append(process_number)
    except KeyboardInterrupt:
        print "Keyboard interrupt in process: ", process_number
    finally:
        print "cleaning up thread", process_number

if __name__ == '__main__':

    processes = []

     #using syncmanager directly instead of letting Manager() do it for me
    manager = SyncManager()
    manager.start(mgr_init)  #fire up the child manager process
    shared_array = manager.list()

    for i in xrange(4):
        p = Process(target=f, args=(i, shared_array))
        p.start()
        processes.append(p)

    try:
        for process in processes:
            process.join()
    except KeyboardInterrupt:
        print "Keyboard interrupt in main"

    for item in shared_array:
        print item

糟糕,我在第一篇帖子中错过了我的导入...已经修复。 - KorreyD
2
很好奇您对以下代码的看法:signal.signal(signal.SIGINT, signal.SIG_IGN) - Jonathan
1
如果你不需要对信号进行任何实际处理,那么可以采用更好的方法。去试试吧。 - KorreyD
1
为什么如果你用threading.Thread替换multiprocessing.Process,运行程序并按下Ctrl+C,SIGINT信号不会发送给主进程和管理器子进程,而只会发送给后者? - Géry Ogam

-1

正如我在类似问题(duplicate)中所回答的:

最简单的解决方案-使用以下命令启动管理器:

manager.start(signal.signal, (signal.SIGINT, signal.SIG_IGN))

使用manager.start()的替代方法。 并检查信号模块是否已导入(import signal)。

这将捕获并忽略管理进程中的SIGINT(Ctrl-C)信号。


这会引发 PicklingError: Can't pickle <function signal at 0x000001705470BB70>: it's not the same object as _signal.signal 错误。 - Géry Ogam

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接