一旦Python多进程程序中的一个工作进程满足特定条件,如何终止该程序

20

我正在使用Python的multiprocessing模块编写程序。该程序调用多个工作函数,每个函数产生一个随机数。一旦其中一个工作函数生成的数字大于0.7,我需要终止程序。

以下是我的程序,其中"如何实现"尚未填写。有任何想法吗?谢谢。

import time
import numpy as np
import multiprocessing as mp
import time
import sys

def f(i):
    np.random.seed(int(time.time()+i))

    time.sleep(3)
    res=np.random.rand()
    print "From i = ",i, "       res = ",res
    if res>0.7:
        print "find it"
        # terminate  ???? Question: How to do this???


if __name__=='__main__':
    num_workers=mp.cpu_count()
    pool=mp.Pool(num_workers)
    for i in range(num_workers):
        p=mp.Process(target=f,args=(i,))
        p.start()

你正在寻找一种简化父子进程之间通信的方法。这个问题在父子进程之间的通信中得到了解答。 - Akshat Mahajan
4个回答

37

没有任何过程能在不使用像 os.kill() 这样的暴力手段的情况下阻止另一个进程。别去那里。

要理智地完成这个任务,你需要重新构思你的基本方法:主进程和工作进程需要相互通信。

我会详细说明,但目前的示例太简单了,没什么用处。例如,按照目前的写法,只调用了 rand() 不超过 num_workers 次,因此没有理由认为其中任何一次必须大于 0.7。

一旦工作函数增加了循环,就变得更加明显了。例如,工作进程可以在循环的顶部检查是否设置了一个 mp.Event,如果设置了,就退出。主进程将在希望工作进程停止时设置该事件。

当一个工作进程找到一个值大于 0.7 时,它可以设置一个不同的 mp.Event。主进程将等待该事件,然后设置 "停止时间" 的 Event,以供工作进程查看,然后做常规循环,使用 .join() 关闭工作线程。

编辑

以下是一个具备可移植性和清晰性的完整解决方案,假设工作进程将一直运行,直到至少有一个找到大于 0.7 的值。注意,我从中删除了 numpy,因为它与此代码无关。这里的代码应该在支持 multiprocessing 的任何平台上的标准 Python 下正常工作:

import random
from time import sleep

def worker(i, quit, foundit):
    print "%d started" % i
    while not quit.is_set():
        x = random.random()
        if x > 0.7:
            print '%d found %g' % (i, x)
            foundit.set()
            break
        sleep(0.1)
    print "%d is done" % i

if __name__ == "__main__":
    import multiprocessing as mp
    quit = mp.Event()
    foundit = mp.Event()
    for i in range(mp.cpu_count()):
        p = mp.Process(target=worker, args=(i, quit, foundit))
        p.start()
    foundit.wait()
    quit.set()

还有一些示例输出:

0 started
1 started
2 started
2 found 0.922803
2 is done
3 started
3 is done
4 started
4 is done
5 started
5 is done
6 started
6 is done
7 started
7 is done
0 is done
1 is done

所有的东西都能够干净地关闭:没有追踪,没有异常终止,没有残留的僵尸进程……就像吹口哨一样干净。

结束它

正如@noxdafox所指出的那样,有一个Pool.terminate()方法,它可以尽最大努力跨平台杀死工作进程,无论它们正在做什么(例如,在Windows上,它调用平台的TerminateProcess())。我不建议在生产代码中使用它,因为突然终止进程可能会使各种共享资源处于不一致的状态,或者让它们泄漏。在multiprocessing文档中有各种警告,您应该将其添加到您的操作系统文档中。

不过,这可能是方便的!以下是使用此方法的完整程序。请注意,我将截止值提高到了0.95,以使其更可能需要花费比眨眼更长的时间才能运行:

import random
from time import sleep

def worker(i):
    print "%d started" % i
    while True:
        x = random.random()
        print '%d found %g' % (i, x)
        if x > 0.95:
            return x # triggers callback
        sleep(0.5)

# callback running only in __main__
def quit(arg):
    print "quitting with %g" % arg
    # note: p is visible because it's global in __main__
    p.terminate()  # kill all pool workers

if __name__ == "__main__":
    import multiprocessing as mp
    ncpu = mp.cpu_count()
    p = mp.Pool(ncpu)
    for i in range(ncpu):
        p.apply_async(worker, args=(i,), callback=quit)
    p.close()
    p.join()

还有一些示例输出:

$ python mptest.py
0 started
0 found 0.391351
1 started
1 found 0.767374
2 started
2 found 0.110969
3 started
3 found 0.611442
4 started
4 found 0.790782
5 started
5 found 0.554611
6 started
6 found 0.0483844
7 started
7 found 0.862496
0 found 0.27175
1 found 0.0398836
2 found 0.884015
3 found 0.988702
quitting with 0.988702
4 found 0.909178
5 found 0.336805
6 found 0.961192
7 found 0.912875
$ [the program ended]

对于第二个“pool”方法,倒数第二行的p.close()有什么作用? - WoooHaaaa
始终遵循预定的关闭程序是最佳实践。 - Tim Peters
@TimPeters 谢谢你的解决方案。我该如何获取返回值?(不仅仅是将其打印出来) - diegus
@diegus,通过您喜欢的任何进程间通信机制。这个答案中的第二种方法已经展示了一种:使用回调将结果传递回主进程。但您也可以使用其他任何您喜欢的方式(例如,队列、管道、共享内存等)。这并不在_原始_问题的范围内,所以我在这里不会再多说。 - Tim Peters
如何使用map和imap?我无法提供任何参数。 - Amir

5

有一种更干净、更符合Python风格的方法可以实现你想要做的事情,这是通过使用 multiprocessing.Pool 提供的回调函数来实现的。

你可以查看这个问题以查看一个实现示例。


2
作为其他用户提到的,你需要让进程相互沟通才能使它们终止对等方。虽然你可以使用 os.kill 来终止对等进程,但是向其发出终止信号更加优雅。
我使用的解决方案非常简单: 1. 查找主进程的进程 ID(pid),该进程会生成所有其他工作进程。该连接信息可以从操作系统中获取,操作系统会跟踪哪个子进程是由哪个父进程生成的。 2. 当其中一个工作进程达到结束条件时,它使用父进程 ID 查找主进程的所有子进程(包括自身),然后遍历列表并向它们发送终止信号(确保不向自身发送信号) 以下代码包含可行的解决方案。
import time
import numpy as np
import multiprocessing as mp
import time
import sys
import os
import psutil
import signal

pid_array = []

def f(i):
    np.random.seed(int(time.time()+i))

    time.sleep(3)
    res=np.random.rand()
    current_process = os.getpid()
    print "From i = ",i, "       res = ",res, " with process ID (pid) = ", current_process
    if res>0.7:
        print "find it"
        # solution: use the parent child connection between processes
        parent = psutil.Process(main_process)
        children = parent.children(recursive=True)
        for process in children:
            if not (process.pid == current_process):
                print "Process: ",current_process,  " killed process: ", process.pid
                process.send_signal(signal.SIGTERM)


if __name__=='__main__':
    num_workers=mp.cpu_count()
    pool=mp.Pool(num_workers)
    main_process = os.getpid()
    print "Main process: ", main_process
    for i in range(num_workers):
        p=mp.Process(target=f,args=(i,))
        p.start()

输出结果清楚地展示了正在发生的事情:
Main process:  30249
From i =  0        res =  0.224609517693  with process ID (pid) =  30259
From i =  1        res =  0.470935062176  with process ID (pid) =  30260
From i =  2        res =  0.493680214732  with process ID (pid) =  30261
From i =  3        res =  0.342349294134  with process ID (pid) =  30262
From i =  4        res =  0.149124648092  with process ID (pid) =  30263
From i =  5        res =  0.0134122107375  with process ID (pid) =  30264
From i =  6        res =  0.719062852901  with process ID (pid) =  30265
find it
From i =  7        res =  0.663682945388  with process ID (pid) =  30266
Process:  30265  killed process:  30259
Process:  30265  killed process:  30260
Process:  30265  killed process:  30261
Process:  30265  killed process:  30262
Process:  30265  killed process:  30263
Process:  30265  killed process:  30264
Process:  30265  killed process:  30266

只是注意,这仅适用于通过fork()启动新进程的平台(如Linux)- 否则,当工作进程尝试访问main_process时,它将引发NameError - Tim Peters
1
psutils 是另一个你需要下载的第三方包。记住,我在我的回答中已经说过“不要去那里”;-) - Tim Peters
我不会回答“是否必要”的问题,因为无论是否必要,这都是一种丑陋、蛮力、不可移植的方法,而本应该使用干净、可移植的进程间通信工具multiprocessing来实现非常简单的应用程序。如果你不愿意学习如何使用它们,那么你在多进程应用程序方面的职业生涯将是痛苦的,但幸运的是很短暂的;-) - Tim Peters
@TimPeters 我觉得你肯定有一个优雅的答案在脑海中。你能详细说明一下吗? - zell
@zell,我已经在这里给出了答案。是的,这只是一个草图,但示例程序做得太少了,我无法猜测你真正需要什么。 - Tim Peters
显示剩余4条评论

-5

要终止您的程序,只需从sys导入exit()

import sys 

sys.exit()

谢谢。但是你的答案是错误的。sys.exit()只终止执行它的进程。 - zell
1
从文档中得知:“由于exit()最终“仅”引发异常,因此仅在从主线程调用时才会退出进程,并且不会拦截异常。” 它无法帮助终止多个进程,也无法解决实际问题:向主进程通信已满足终止条件。 - Akshat Mahajan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接