如何优雅地停止一个附着于队列的Python多进程工作器在无限循环中运行?

20

我正在使用 multiprocessing.Poolmultiprocessing.Queue 在 Python 中实现生产者消费者模式。消费者是预先分叉的进程,使用 gevent 来生成多个任务。

这是代码的精简版本:

import gevent
from Queue import Empty as QueueEmpty
from multiprocessing import Process, Queue, Pool
import signal
import time

# Task queue
queue = Queue()

def init_worker ():
    # Ignore signals in worker
    signal.signal( signal.SIGTERM, signal.SIG_IGN )
    signal.signal( signal.SIGINT, signal.SIG_IGN )
    signal.signal( signal.SIGQUIT, signal.SIG_IGN )

# One of the worker task
def worker_task1( ):
    while True:
        try:
            m = queue.get( timeout = 2 )

            # Break out if producer says quit
            if m == 'QUIT':
                print 'TIME TO QUIT'
                break

        except QueueEmpty:
            pass

# Worker
def work( ):
    gevent.joinall([
        gevent.spawn( worker_task1 ),
    ])

pool = Pool( 2, init_worker )
for i in xrange( 2 ):
    pool.apply_async( work )

try:
    while True:
        queue.put( 'Some Task' )
        time.sleep( 2 )

except KeyboardInterrupt as e:
    print 'STOPPING'

    # Signal all workers to quit
    for i in xrange( 2 ):
        queue.put( 'QUIT' )

    pool.join()

现在当我试图退出它时,我得到了以下状态:

  1. 父进程正在等待其中一个子进程加入。
  2. 其中一个子进程处于僵尸(defunct)状态。所以已经结束了,但父进程正在等待其他子进程结束。
  3. 另一个子进程正在显示:futex(0x7f99d9188000, FUTEX_WAIT, 0, NULL ...

那么正确的方法是什么,可以干净地结束这样的进程?

1个回答

14

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接