捕获键盘中断以停止Python多进程工作队列的工作

5

我从StackOverflow的几篇文章中获取了灵感编写了以下代码。

场景

我想要创建一个多进程队列,在该队列上有多个工作进程“监听”。

在接收到键盘中断的情况下,主进程不再向队列中加入新的项目,并且利用哨兵对象来优雅地停止工作进程。

问题

我目前使用的版本存在以下问题:

signal.signal(signal.SIGINT, signal.SIG_IGN) 

忽略Ctrl + C的同时也忽略主进程。
有什么想法吗?我需要使用多进程工作池吗?一些示例表明我可能需要。那我还能用队列吗?
from multiprocessing import Pool, Process,Queue
import time
import signal
# http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Queue
# http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Process


class Worker(Process):
    def __init__(self, queue,ident):
        super(Worker, self).__init__()
        # Ignore Signals
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        self.queue= queue
        self.idstr= str(ident)
        print "Ident" + self.idstr

    def run(self):
        print 'Worker started'
        # do some initialization here

        print 'Computing things!'
        for data in iter( self.queue.get, None ):
            print "#" + self.idstr + " : " + str(data)
            time.sleep(5)
            print "#" + self.idstr + "Queue Size: " + str(self.queue.qsize())

        print "Worker Done"

#### Main ####
request_queue = Queue(10)

for i in range(4):
    Worker( request_queue,i ).start()

try:
    for data in range(1000000):
        request_queue.put( data )
        #print "Queue Size: " + str(request_queue.qsize())
        # Sentinel objects to allow clean shutdown: 1 per worker.
    for i in range(4):
        request_queue.put( None ) 

except KeyboardInterrupt:
    print "Caught KeyboardInterrupt, terminating workers"
    while  request_queue.empty()==False:
         request_queue.get()
    request_queue.put( None )    

请查看此链接:https://dev59.com/r2855IYBdhLWcg3wrGZY - Paco
嘿,帕科,是的,我找到了它,我正在考虑如何集成它。这样队列就会停止填充,工作人员也会停止。初步想法是,我仍然希望在主线程中保留异常。 - Dukeatcoding
2个回答

1
根据你的解决方案(非常好),我增加了一层额外的保护,以防主代码无响应并且用户取消两次:
global STOP

import os, signal
def signal_handler(sig, frame):
    global STOP
    if STOP:
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        os.kill(os.getpid(), signal.SIGTERM)
    STOP = True
signal.signal(signal.SIGINT, signal_handler)

0

我想我找到了一个解决方案。虽然我不喜欢从主线程得到1次SIGINT信号,而从Worker线程得到4次,但也许我必须接受这个事实。

  1. 我为中断信号指定了一个信号处理程序。

  2. 收到第一个Sig INT后,我忽略更多的SIG Int信号

  3. 我将停止标志切换为TRUE

  4. 我退出队列插入循环

  5. 我调用停止函数,清除队列并插入停止哨兵

    from multiprocessing import Pool, Process,Queue
    import time
    import signal
    # http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Queue
    # http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Process
    
    # 停止循环的标志
    stop = False
    
    # 定义SIGINT
    def signal_handler(sig, frame):
        print '您按下了Ctrl+C!'
        global stop
        stop = True
        # 忽略更多的Ctrl+C
        signal.signal(signal.SIGINT, signal.SIG_IGN) 
    
    signal.signal(signal.SIGINT, signal_handler)
    
    def stopSentinel(request_queue):
        print "CTRL Stop Queue and insert None"
    
        # 清空现有队列
        while  request_queue.empty()==False:
            request_queue.get()
    
        # 为每个工作进程放置一个None
        for i in range(4):
            request_queue.put( None ) 
    
    
    class Worker(Process):
        def __init__(self, queue,ident):
            super(Worker, self).__init__()
    
            self.queue= queue
            self.idstr= str(ident)
            print "Ident" + self.idstr
    
        def run(self):
            print '工作进程已启动'
            # 在此处进行一些初始化
    
            print '正在计算!'
            for data in iter( self.queue.get, None ):
                print "#" + self.idstr + " : " + str(data)
                time.sleep(5)
                print "#" + self.idstr + "Queue Size: " + str(self.queue.qsize())
    
            print "工作进程完成"
    
    
    
    #### 主函数 #####
    request_queue = Queue(10)
    
    for i in range(4):
        Worker( request_queue, i ).start()
    
    #### 填充队列数据 ####
    for data in range(1000000):
        request_queue.put( data )
        #print "Queue Size: " + str(request_queue.qsize())
        # 用哨兵对象允许干净的关闭:每个工作进程一个。
    
        # 检查是否停止
        print "检查退出"
        if stop == True:
            print "停止中断"
            break
    
    if stop == True:
        stopSentinel(request_queue)
    else:       
        print "正常停止" 
        for i in range(4):
            request_queue.put( None ) 
    

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接