Python多进程模块:使用超时时间加入进程

16

我正在对一个复杂的模拟程序进行参数优化,使用了multiprocessing模块来提高优化算法的性能。我在http://pymotw.com/2/multiprocessing/basics.html学习了multiprocessing的基础知识。

不同参数下的模拟持续时间不同,大约为1到5分钟。如果选择的参数非常糟糕,模拟可能需要30分钟甚至更长时间,而结果也是无用的。因此,我考虑在multiprocessing中加入超时功能,以终止持续时间超过指定时间的所有模拟。以下是该问题的摘要:

import numpy as np
import time
import multiprocessing

def worker(num):
    
    time.sleep(np.random.random()*20)

def main():
    
    pnum = 10    
    
    procs = []
    for i in range(pnum):
        p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1)))
        procs.append(p)
        p.start()
        print('starting', p.name)
        
    for p in procs:
        p.join(5)
        print('stopping', p.name)
     
if __name__ == "__main__":
    main()

代码行p.join(5)定义了5秒的超时时间。由于for循环for p in procs:,程序会等待5秒直到第一个进程结束,然后再等待5秒直到第二个进程结束,以此类推。但是我希望程序可以终止所有持续时间超过5秒的进程。此外,如果没有任何进程持续时间超过5秒,程序就不应该等待这5秒。


请看这里:https://dev59.com/9HM_5IYBdhLWcg3w3nRs。这可能是一个重复的问题,但我不确定是否应该为您标记它。如果那个答案提出的解决方案对您不起作用,请告诉我们原因。 - skrrgwasme
这是一篇有趣的文章,但我认为它只是解决了连续启动而不是同时启动的进程的问题。我的程序应该同时启动进程,并杀死超过“全局”超时时间的进程。 - brp
3个回答

17

您可以通过创建一个循环来实现,该循环将等待一定的超时时间,经常检查所有进程是否已完成。如果它们没有在分配的时间内全部完成,则终止所有进程:

TIMEOUT = 5 
start = time.time()
while time.time() - start <= TIMEOUT:
    if not any(p.is_alive() for p in procs):
        # All the processes are done, break now.
        break

    time.sleep(.1)  # Just to avoid hogging the CPU
else:
    # We only enter this if we didn't 'break' above.
    print("timed out, killing all processes")
    for p in procs:
        p.terminate()
        p.join()

1
谢谢,这似乎是一个合适的解决方案。不幸的是,如果进程在超时之前完成,此代码不会中断。我尝试将工作函数设置为 time.sleep(1),并且在1秒后所有的 p.is_alive() 都返回 False。所以现在代码应该进入 break 语句,但它仍然在等待超时... - brp
我找到了问题:print (p.is_alive() for p in procs) 返回 <generator object <genexpr> at 0x05712B20>,但它应该是一个包含元素 TrueFalse 的列表,以便于 any() 函数的理解。 - brp
@brp 使用 any([p.is_alive() for p in procs])。这样它就变成了列表推导而不是生成器表达式。 - dano
1
@brp 噢,我刚注意到你正在使用 np.any 而非内置的 any。这就是生成器表达式无法工作的原因。np.any 仅适用于类似数组的对象。 - dano
内置的any与列表推导式是关键!谢谢! - brp

13

如果您想杀死所有进程,您可以使用multiprocessing中的Pool。您需要为所有执行定义一个通用超时时间,而不是个别超时时间。

import numpy as np
import time
from multiprocessing import Pool

def worker(num):
    xtime = np.random.random()*20
    time.sleep(xtime)
    return xtime

def main():

    pnum = 10
    pool = Pool()
    args = range(pnum)
    pool_result = pool.map_async(worker, args)

    # wait 5 minutes for every worker to finish
    pool_result.wait(timeout=300)

    # once the timeout has finished we can try to get the results
    if pool_result.ready():
        print(pool_result.get(timeout=1))

if __name__ == "__main__":
    main()

这将为您按顺序列出所有工作进程的返回值列表。
更多信息请参见: https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.pool


我认为这并没有真正终止池中的线程——即使它们没有完成,它只是将执行返回到主线程。 - Ben Wheeler
1
我不明白为什么我们要执行 pool_result.get(timeout=1),即:如果池结果已经准备好了,那么结果不应该也准备好了,不需要超时吗? - Kieleth

3

多亏了dano的帮助,我找到了解决方案:

import numpy as np
import time
import multiprocessing

def worker(num):

    time.sleep(np.random.random()*20)

def main():

    pnum = 10    
    TIMEOUT = 5 
    procs = []
    bool_list = [True]*pnum

    for i in range(pnum):
        p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1)))
        procs.append(p)
        p.start()
        print('starting', p.name)

    start = time.time()
    while time.time() - start <= TIMEOUT:
        for i in range(pnum):
            bool_list[i] = procs[i].is_alive()
            
        print(bool_list)
            
        if np.any(bool_list):  
            time.sleep(.1)  
        else:
            break
    else:
        print("timed out, killing all processes")
        for p in procs:
            p.terminate()
            
    for p in procs:
        print('stopping', p.name,'=', p.is_alive())
        p.join()

if __name__ == "__main__":
    main()

这不是最优雅的方式,我相信使用bool_list肯定有更好的方法。在超时5秒后仍然存在的进程将被终止。如果您在工作函数中设置的时间比超时时间短,您会发现程序在达到5秒超时之前就停止了。如果有更加优雅的解决方案,我依然持开放态度 :)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接