Python多进程:优雅地终止一个进程

4

这样做

import multiprocessing
import schedule


def worker():
     #do some stuff


def sched(argv):
    schedule.every(0.01).minutes.do(worker)          
    while True:
        schedule.run_pending()


processs = []
..
..
p = multiprocessing.Process(target=sched,args)
..
..
processs.append(p)

for p in processs:
    p.terminate()

如何优雅地杀死进程列表?

如果不行,有没有更简单的方法?

目标是重新加载配置文件到内存中,因此我想杀死所有子进程并创建其他进程,后者将读取新的配置文件。

编辑:添加了更多代码以说明我正在运行一个while True循环

编辑:这是@dano suggestion之后的新代码

def get_config(self):
        from ConfigParser import SafeConfigParser
..
        return argv

def sched(self, args, event):
#schedule instruction:
        schedule.every(0.01).minutes.do(self.worker,args)
        while not  event.is_set():
                schedule.run_pending()                                                                    

def dispatch_processs(self, conf):
        processs = []
        event = multiprocessing.Event()

        for conf in self.get_config():
                process = multiprocessing.Process(target=self.sched,args=( i for i in conf), kwargs={'event' : event})
                processs.append((process, event)
return processs

def start_process(self, process):
        process.start()

def gracefull_process(self, process):
        process.join()

def main(self):
        while True:
                processs = self.dispatch_processs(self.get_config())
                print ("%s processes running " % len(processs) )

                for process, event in processs:                                                               

                        self.start_process(process)
                        time.sleep(1)
                        event.set()
                        self.gracefull_process(process)

代码的好处是,我可以编辑配置文件,进程将重新加载其配置。
问题在于只有第一个进程运行,其他进程被忽略。
编辑:这拯救了我的生命,在schedule()中使用while True不是一个好主意,所以我设置了refresh_time。
def sched(self, args, event):

    schedule.every(0.01).minutes.do(self.worker,args)
    for i in range(refresh_time):
            schedule.run_pending() 
            time.sleep(1)

def start_processs(self, processs):
        for p,event in processs:
                if not p.is_alive():
                        p.start()
                time.sleep(1)
                event.set()

        self.gracefull_processs(processs)

def gracefull_processs(self, processs):
        for p,event in processs:
                p.join()
        processs = self.dispatch_processs(self.get_config())
        self.start_processs(processs)

def main(self):

        while True:
                processs = self.dispatch_processs(self.get_config())

                self.start_processs(processs)
                break
        print ("Reloading function main")
        self.main()

这太难了。我想你可以保持一个列表来记录每个子进程的状态,如果它正在执行你不想打断的操作,则状态为繁忙。 - laike9m
你能给我们一个关于你的工作逻辑实际上是什么样子的想法吗?最简单的方法可能是使用multiprocessing.Event来向工作进程发出关闭信号。 - dano
执行一些Python/Bash的脚本(进程之间没有通信),每个子进程都应该处理一个任务。 - 4m1nh4j1
我会在帖子上添加更多信息 :-) - 4m1nh4j1
是的,抱歉,修改已完成。 - 4m1nh4j1
显示剩余2条评论
3个回答

6
如果你不介意只在worker完成所有工作后再中止程序,那么很容易就可以添加一个multiprocessing.Event来优雅地退出程序:
import multiprocessing
import schedule


def worker():
     #do some stuff

def sched(argv, event=None):
    schedule.every(0.01).minutes.do(worker)          
    while not event.is_set():  # Run until we're told to shut down.
        schedule.run_pending()

processes = []
..
..
event = multiprocessing.Event()
p = multiprocessing.Process(target=sched,args, kwargs={'event' : event})
..
..
processes.append((p, event))

# Tell all processes to shut down
for _, event in processes:
    event.set()

# Now actually wait for them to shut down
for p, _ in processes:
    p.join()

谢谢,它可以工作了,但我添加了 for p, _ in processs: p.start() 并在 sched() 的参数列表中添加了 event。程序启动后退出。实际上,事件如何帮助我杀死和重新加载所有进程? - 4m1nh4j1
@4m1nh4j1 嗯,你可能想把启动进程的代码移到一个函数中。然后,当你有一些配置文件更改要推送时,你为所有进程调用 event.set(),等待它们退出,然后重新运行进程启动脚本。实际上,根据配置文件的读取方式,你可能可以使用 event 来告诉每个工作进程重新读取配置文件,而不必全部杀死它们。 - dano
谢谢,我对我的代码进行了编辑,因为我部分地解决了问题,但我仍然有一些问题。 - 4m1nh4j1
@4m1nh4j1 你需要为每个启动的 Process 使用不同的 event。把 event = multiprocessing.Event() 这一行放到 for conf in self.get_config(): 循环内部。 - dano
我找到了一个解决方案(参见更新),我不确定我是否使用了您的最后一条评论,但我从一开始就使用了您的答案来理解和解决问题。非常感谢您的时间。 - 4m1nh4j1

2

A: 不是的,.terminate()和SIG_*方法都比较粗暴

为了优雅地结束任何进程,如您所述,在两端应该有一些“软信号”层,允许发送/接收智能信号,而不依赖于操作系统的解释(操作系统对您的应用程序级上下文和正在处理的工作单元的状态一无所知)。

enter image description here 您可能想要阅读有关此类软信号方法的链接,这些链接来自于 >>> https://stackoverflow.com/a/25373416/3666197


1

不,根据您自己的优雅定义,它不会优雅地终止进程 - 除非您采取一些额外的步骤。

假设您正在使用Unix系统(因为您提到了scp),terminate向子进程发送SIGTERM信号。您可以在子进程中捕获此信号,并相应地采取行动(等待scp完成):

import signal

def on_terminate(signum, stack):
    wait_for_current_scp_operation()

signal.signal(signal.SIGTERM, on_terminate)

这里有一个关于处理和发送信号的教程


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接