Python 多进程池超时问题

4

我想使用multiprocessing.Pool,但是它无法在超时后中止任务。我找到了解决方案并进行了一些修改。

from multiprocessing import util, Pool, TimeoutError
from multiprocessing.dummy import Pool as ThreadPool
import threading
import sys
from functools import partial
import time


def worker(y):
    print("worker sleep {} sec, thread: {}".format(y, threading.current_thread()))
    start = time.time()
    while True:
       if time.time() - start >= y:
           break
       time.sleep(0.5)
       # show work progress
       print(y)
    return y


def collect_my_result(result):
    print("Got result {}".format(result))


def abortable_worker(func, *args, **kwargs):
    timeout = kwargs.get('timeout', None)
    p = ThreadPool(1)
    res = p.apply_async(func, args=args)
    try:
        # Wait timeout seconds for func to complete.
        out = res.get(timeout)
    except TimeoutError:
        print("Aborting due to timeout {}".format(args[1]))
        # kill worker itself when get TimeoutError
        sys.exit(1)
    else:
        return out


def empty_func():
    pass


if __name__ == "__main__":
    TIMEOUT = 4
    util.log_to_stderr(util.DEBUG)
    pool = Pool(processes=4)

    # k - time to job sleep
    featureClass = [(k,) for k in range(20, 0, -1)]  # list of arguments
    for f in featureClass:
        # check available worker
        pool.apply(empty_func)

        # run job with timeout
        abortable_func = partial(abortable_worker, worker, timeout=TIMEOUT)
        pool.apply_async(abortable_func, args=f, callback=collect_my_result)

    time.sleep(TIMEOUT)
    pool.terminate()
    print("exit")

主要修改 - 工作进程使用 sys.exit(1) 退出。这将终止工作进程并杀死作业线程,但我不确定这种解决方案是否好。当进程在运行作业时自行终止时,可能会出现哪些潜在问题?


好的。我想你最好在worker()中处理超时并将结果写入一个公共集合。这样,你只需要在所有线程上调用join(),然后处理结果。如果你的系统没有过重的负载,事情应该会顺利进行。 - mljli
1个回答

13

停止正在运行的作业并不会带来隐含的风险,操作系统将负责正确终止进程。

如果您的作业正在写文件,则可能会在磁盘上留下许多被截断的文件。

如果您正在写入数据库或与某些远程进程连接,则可能出现一些小问题。

但是,Python标准池不支持在任务超时时终止工作程序。强制终止进程可能会导致应用程序内出现奇怪的行为。

Pebble处理池支持任务超时。

from pebble import ProcessPool
from concurrent.futures import TimeoutError

TIMEOUT_SECONDS = 5

def function(one, two):
    return one + two

with ProcessPool() as pool:
    future = pool.schedule(function, args=(1, 2), timeout=TIMEOUT_SECONDS)

    try:
        result = future.result()
    except TimeoutError:
        print("Future: %s took more than 5 seconds to complete" % future)

它看起来不错。你知道有没有在生产中使用它的成功案例吗? - rusnasonov
1
不确定我是否理解正确。您要生产中Pebble的成功案例还是系统终止进程的成功案例?Pebble是一个相当稳定的库,拥有相当数量的下载量。 - noxdafox
是的,你理解得没错。你知道有哪些项目使用Peeble吗? - rusnasonov
1
我们在一些系统中生产使用Pebble,它的表现非常好。我不知道有任何公共项目在使用它。 - noxdafox
1
感谢 @noxdafox,Pebble 在我不断尝试和错误超时锁定两天后拯救了我。 - ASHu2

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接