Python concurrent.futures:处理子进程中的异常

3

我有一个相当普通的实现 concurrent.futures.ProcessPoolExecutor 的代码,类似于以下形式(使用Python 3.6):

files = get_files()
processor = get_processor_instance()
with concurrent.futures.ProcessPoolExecutor() as executor:
    list(executor.map(processor.process, files))

虽然 processor 是多种可用处理器类之一的实例,它们都共享 process 方法,该方法大致如下:

def process(self, file):
    log.debug(f"Processing source file {file.name}.")
    with DBConnection(self.db_url) as session:
        file = session.merge(file)
        session.refresh(file)
        self._set_file(file)
        timer = perf_counter()
        try:
            self.records = self._get_records()
            self._save_output()
        except Exception as ex:
            log.warning(f"Failed to process source file {file.ORIGINAL_NAME}: {ex}")
            self.error_time = time.time()
            self.records = None
        else:
            process_duration = perf_counter() - timer
            log.info(f'File {file.name} processed in {process_duration:.6f} seconds.')
            file.process_duration = process_duration
        session.commit()

_get_records_save_output方法的实现因类而异,但我的问题在于处理错误。我故意进行测试,以便其中一个方法耗尽内存,但我期望上面的except块能够捕获它并移动到下一个文件 - 当我在单个进程中运行代码时,这正是发生的。

如果我像上面描述的那样使用ProcessPoolExecutor,它会引发BrokenProcessPool异常并终止所有执行:

Traceback (most recent call last):
  File "/vagrant/myapp/myapp.py", line 94, in _process
    list(executor.map(processor.process, files))
  File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/process.py", line 366, in _chain_from_iterable_of_lists
    for element in iterable:
  File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 586, in result_iterator
    yield fs.pop().result()
  File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

当然,我可以在调用代码中捕获BrokenProcessPool,但我更希望在内部处理错误并继续处理下一个文件。

我还尝试使用标准的multiprocessing.Pool对象,如下所示:

with multiprocessing.Pool() as pool:
    pool.map(processor.process, files)

在这种情况下,行为更加奇怪:在开始处理前两个文件并引发内存不足错误后,它继续处理后面的较小文件,直到完全处理完毕。然而,except块显然从未被触发(没有日志消息,也没有error_time),应用程序仅挂起,既不完成也不执行任何操作,直到手动终止。
我希望try..except块能使每个进程成为自包含的,处理自己的错误而不影响主应用程序。有什么想法如何实现?

1
我认为你无法从内部的process函数/例程中捕获在主流(调用者作用域)级别抛出的异常。当出现上述错误时,请尝试调试executor对象。 - RomanPerekhrest
1个回答

7
所以,在经过大量调试后(并且要感谢@RomanPerekhrest建议检查executor对象),我找到了原因。如问题所述,测试数据包含多个文件,其中两个文件相当大(每个文件超过100万行CSV)。这两个文件都导致我的测试机器(一个2GB的VM)出现故障,但方式不同——虽然第一个文件更大,导致常规的内存不足错误,可以通过except处理,但第二个文件只是导致sigkill。没有进行太多探索,我怀疑更大的文件在读取时(在_get_records方法中完成)根本无法适应内存,而较小的文件则可以,但之后对其进行操作(在_save_output中完成)会导致溢出并杀死进程。

我的解决方案是简单地捕获BrokenProcessPool异常并通知用户问题;我还添加了一个选项,该选项在一个进程中运行处理任务,这种情况下任何过大的文件都将被标记为有错误:

files = get_files()
processor = get_processor_instance()
results = []
if args.nonconcurrent:
    results = list(map(processor.process, files))
else:
    with concurrent.futures.ProcessPoolExecutor() as executor:
        try:
            results = list(executor.map(processor.process, files))
        except concurrent.futures.process.BrokenProcessPool as ex:
            raise MyCustomProcessingError(
                f"{ex} This might be caused by limited system resources. "
                "Try increasing system memory or disable concurrent processing "
                "using the --nonconcurrent option."
            )

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接