如何立即重新抛出在任何工作线程中抛出的异常?

3
我正在使用 ThreadPoolExecutor ,如果任何工作线程失败,我需要中止整个计算。 示例 1. 这将打印成功,无论错误如何,因为 ThreadPoolExecutor 不会自动重新引发异常。
from concurrent.futures import ThreadPoolExecutor

def task():
    raise ValueError

with ThreadPoolExecutor() as executor:
    executor.submit(task)
print('Success')
例子2. 这个例子正确地使主线程崩溃,因为.result()会重新引发异常。但是它会等待第一个任务完成,所以主线程会延迟体验到异常。
import time
from concurrent.futures import ThreadPoolExecutor

def task(should_raise):
    time.sleep(1)
    if should_raise:
        raise ValueError

with ThreadPoolExecutor() as executor:
    executor.submit(task, False).result()
    executor.submit(task, True).result()
print('Success')

如何在主线程中立即注意到工作线程异常,在异常发生后立即处理失败并中止其余工作线程?

2个回答

2

首先,我们必须在请求结果之前提交任务。否则,线程甚至不会并行运行:

futures = []
with ThreadPoolExecutor() as executor:
    futures.append(executor.submit(good_task))
    futures.append(executor.submit(bad_task))
for future in futures:
    future.result()

现在,我们可以将异常信息存储在一个变量中,该变量对主线程和工作线程都可用:
exc_info = None

主线程不能真正终止其子进程,因此我们让工作线程检查异常信息并停止执行:
def good_task():
    global exc_info
    while not exc_info:
        time.sleep(0.1)

def bad_task():
    global exc_info
    time.sleep(0.2)
    try:
        raise ValueError()
    except Exception:
        exc_info = sys.exc_info()

当所有线程终止后,主线程可以检查保存异常信息的变量。如果它被填充了,我们重新引发异常:

if exc_info:
    raise exc_info[0].with_traceback(exc_info[1], exc_info[2])
print('Success')

1

我想,我会这样来实现:

在主进程中,我会创建两个队列:

  1. 一个用于报告异常,
  2. 一个用于通知取消。

::

import multiprocessing as mp

error_queue = mp.Queue()
cancel_queue = mp.Queue()

我创建每个 ThreadPoolExecutor,并将这些队列作为参数传递。
class MyExecutor(concurrent.futures.ThreadPoolExecutor):
    def __init__(self, error_queue, cancel_queue):
        self.error_queue : error_queue
        self.cancel_queue = cancel_queue

每个ThreadPoolExecutor都有一个主循环。在此循环中,我首先扫描cancel_queue以查看是否有“取消”消息可用。
在主循环中,我还实现了异常管理器。如果出现错误,我会引发异常:
self.status = "running"
with True:  # <- or something else
    if not self.cancel_queue.empty():
        self.status = "cancelled"
        break
    try:
        # normal processing
        ...
    except Exception as exc:
        # you can log the exception here for debug
        self.error_queue.put(exc)
        self.status = "error"
        break
    time.sleep(.1)

在主进程中:
运行所有MyExecutor实例。
扫描error_queue:
while True:
    if not error_queue.empty():
        cancel_queue.put("cancel")
    time.sleep(.1)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接