如何终止多进程池(multiprocessing Pool)中的进程?

16

我正在一个渲染农场工作,需要让我的客户端能够启动多个渲染器实例,而不会阻塞,以便客户端可以接收新命令。我已经成功地实现了这一点,但是我在终止创建的进程方面遇到了困难。

在全局层面上,我定义我的池(以便我可以从任何函数中访问它):

p = Pool(2)

然后我使用apply_async调用我的渲染器:

for i in range(totalInstances):
    p.apply_async(render, (allRenderArgs[i],args[2]), callback=renderFinished)
p.close()

该函数会执行完毕,启动后台进程,并等待新的命令。我创建了一个简单的命令,可以终止客户端并停止渲染:

def close():
    '''
        close this client instance
    '''
    tn.write ("say "+USER+" is leaving the farm\r\n")
    try:
        p.terminate()
    except Exception,e:
        print str(e)
        sys.exit()

看起来没有出错(否则会打印错误信息),Python 终止了,但后台进程仍在运行。有人能推荐一种更好的控制这些启动程序的方法吗?


尝试使用from multiprocessing import util; util.get_logger().setLevel(util.DEBUG)启用调试日志记录,并粘贴输出。 - schlamar
2
我以前见过这种行为,但现在无法重现...我想知道在调用p.terminate()之后是否调用p.join()会有帮助?我也想知道是否需要调用terminate,只是执行sys.exit()是否会适当地回收进程池及其所有进程。 - mdscruggs
当我尝试启用日志记录时,控制台会显示以下信息:"找不到与多进程相关的处理程序。" 不幸的是,即使在p.terminate()之后执行p.join()也没有任何效果,而sys.exit()虽然关闭了Python,但仍会将进程留在后台运行。 - tk421storm
尝试使用multiprocessing.log_to_stderr().setLevel(logging.DEBUG)render()是否会启动其他进程,例如使用subprocess模块? - jfs
4个回答

13
我找到了解决方案:在单独的线程中停止池,就像这样:

我找到了解决方案:在单独的线程中停止池,就像这样:

def close_pool():
    global pool
    pool.close()
    pool.terminate()
    pool.join()

def term(*args,**kwargs):
    sys.stderr.write('\nStopping...')
    # httpd.shutdown()
    stophttp = threading.Thread(target=httpd.shutdown)
    stophttp.start()
    stoppool=threading.Thread(target=close_pool)
    stoppool.daemon=True
    stoppool.start()


signal.signal(signal.SIGTERM, term)
signal.signal(signal.SIGINT, term)
signal.signal(signal.SIGQUIT, term)

运行良好且始终测试通过。

signal.SIGINT

从键盘中断(CTRL + C)。默认操作是引发KeyboardInterrupt异常。

signal.SIGKILL

杀死信号。它无法被捕获、阻止或忽略。

signal.SIGTERM

终止信号。

signal.SIGQUIT

退出并生成核心转储文件。


SIGTERMSIGINTSIGQUIT是什么? - Sabito stands with Ukraine
2
用于中断程序的操作,例如 ctrl+c、任务终止、sys.exit() 等。 - eri

7
如果您仍然遇到此问题,可以尝试使用守护进程模拟一个Pool(假设您是从非守护进程启动池/进程)。我怀疑这不是最好的解决方案,因为似乎您的Pool进程应该退出,但这是我能想到的所有解决方法。我不知道您的回调函数是做什么的,所以我不确定在我的下面示例中放置它的位置。
我还建议尝试在__main__中创建Pool,因为我有经验(和文档)表明全局生成进程时会出现奇怪的情况。特别是如果您正在使用Windows:http://docs.python.org/2/library/multiprocessing.html#windows
from multiprocessing import Process, JoinableQueue

# the function for each process in our pool
def pool_func(q):
    while True:
        allRenderArg, otherArg = q.get() # blocks until the queue has an item
        try:
            render(allRenderArg, otherArg)
        finally: q.task_done()

# best practice to go through main for multiprocessing
if __name__=='__main__':
    # create the pool
    pool_size = 2
    pool = []
    q = JoinableQueue()
    for x in range(pool_size):
        pool.append(Process(target=pool_func, args=(q,)))

    # start the pool, making it "daemonic" (the pool should exit when this proc exits)
    for p in pool:
        p.daemon = True
        p.start()

    # submit jobs to the queue
    for i in range(totalInstances):
        q.put((allRenderArgs[i], args[2]))

    # wait for all tasks to complete, then exit
    q.join()

1
有趣!在主函数中定义而不是全局定义的技巧很好。我按照这种方式重建了代码,虽然没有解决我的问题(见下文),但我更喜欢这种构造方式。谢谢! - tk421storm

0
# -*- coding:utf-8 -*-
import multiprocessing
import time
import sys
import threading
from functools import partial


#> work func
def f(a,b,c,d,e):
    print('start')
    time.sleep(4)
    print(a,b,c,d,e)

###########> subProcess func
#1. start a thead for work func
#2. waiting thead with a timeout
#3. exit the subProcess
###########
def mulPro(f, *args, **kwargs):
    timeout = kwargs.get('timeout',None)

    #1. 
    t = threading.Thread(target=f, args=args)
    t.setDaemon(True)
    t.start()
    #2. 
    t.join(timeout)
    #3. 
    sys.exit()

if __name__ == "__main__":

    p = multiprocessing.Pool(5)
    for i in range(5):
        #1. process the work func with "subProcess func"
        new_f = partial(mulPro, f, timeout=8)
        #2. fire on
        p.apply_async(new_f, args=(1,2,3,4,5),)

        # p.apply_async(f, args=(1,2,3,4,5), timeout=2)
    for i in range(10):
        time.sleep(1)
        print(i+1,"s")

    p.close()
    # p.join()

-4

找到了自己问题的答案。主要问题是我调用了第三方应用程序而不是函数。当我调用子进程[使用call()或Popen()]时,它会创建一个新的Python实例,其唯一目的是调用新的应用程序。但是当Python退出时,它将杀死这个新的Python实例并保留应用程序运行。

解决方法是通过查找创建的Python进程的pid,获取该pid的子进程,并杀死它们。此代码特定于osx;对于Linux,有更简单的代码(不依赖于grep)可用。

for process in pool:
    processId = process.pid
    print "attempting to terminate "+str(processId)
    command = " ps -o pid,ppid -ax | grep "+str(processId)+" | cut -f 1 -d \" \" | tail -1"
    ps_command = Popen(command, shell=True, stdout=PIPE)
    ps_output = ps_command.stdout.read()
    retcode = ps_command.wait()
    assert retcode == 0, "ps command returned %d" % retcode
    print "child process pid: "+ str(ps_output)
    os.kill(int(ps_output), signal.SIGTERM)
    os.kill(int(processId), signal.SIGTERM)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接