在concurrent.futures.Executor.map中进行异常处理

27

https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map

如果一个函数调用引发异常,那么当从迭代器中检索其值时,该异常将被引发。

以下代码片段仅输出第一个异常(Exception: 1),然后停止。这是否与上述陈述相矛盾?我期望以下代码在循环中打印出所有的异常。

def test_func(val):
    raise Exception(val)

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    for r in executor.map(test_func,[1,2,3,4,5]):
        try:
            print r
        except Exception as exc:
            print 'generated an exception: %s' % (exc)


相关:https://dev59.com/2lwX5IYBdhLWcg3wvRgf - Ciro Santilli OurBigBook.com
4个回答

27

Ehsan的解决方案很好,但是将结果作为完成的内容取出可能会比等待列表中的顺序项目更有效率。以下是来自库文档的示例。

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

1
谢谢。我一直在为什么无法执行脚本的所有部分而苦恼。 - Vitalis
2
感谢您的精彩解释。 - Arnab Mukherjee

16

正如上面提到的那样,不幸的是,executor.map的API很有限,只允许您获取第一个异常。此外,在遍历结果时,您只会得到第一个异常之前的值。

回答您的问题,如果您不想使用不同的库,可以展开您的映射,手动应用每个函数:

future_list = []
with concurrent.futures.ThreadPoolExecutor() as executor:
  for arg in range(10):
    future = executor.submit(test_func, arg)
    future_list.append(future)

for future in future_list:
  try:
    print(future.result())
  except Exception as e:
    print(e)

这使您能够单独处理每个未来。


8
map 方法返回一个生成器,可以在结果准备好后进行迭代。
不幸的是,在异常发生后无法恢复生成器。参考PEP 255

如果由生成器函数引发或传递未处理的异常(包括但不限于StopIteration),则异常会以通常的方式传递给调用者,并且随后尝试恢复生成器函数将引发StopIteration。换句话说,未处理的异常终止了生成器的有用生命周期。

还有其他库,如pebble,允许在出现错误后继续迭代。请查看文档中的示例

Pebble是Python中用于多线程和多进程的常见框架吗?现在Python有自己的本机concurrent.futures模块,那么Pebble是不是已经变得多余了?谢谢。 - user1008636
Pebble 解决了 Python 原生库的一些限制,比如上述例子和其他问题,比如终止超时任务。 - noxdafox
现在concurrent.futures没有这样的选项吗? - user1008636
不是的。这就是为什么设计pebble库。另一个值得检查的替代库是billiard - noxdafox
这是一个奇怪的行为...我想知道为什么会这样... - Anton Daneyko

5

虽然其他人已经给出了有关捕获多个异常的正确方式的优秀答案,但我想回答一下为什么在问题中捕获异常的方式是错误的。以下代码片段:

class ExceptionA(Exception):
    pass


def test_func(val):
    raise ExceptionA(val)


with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    try:
        for r in executor.map(test_func, [1, 2, 3, 4, 5]):
            try:
                print(r)
            except ExceptionA as exc:
                print(f'Catch inside: {exc}')

    except ExceptionA as exc:
        print(f'Catch outside: {exc}')

输出结果为Catch outside: 1

Python文档中写道:

如果一个函数调用引发异常,则当从迭代器检索其值时,该异常将被引发。

这意味着如果你想要捕获异常,你需要在循环外面捕获它,因为值是在循环语句而不是打印语句处检索的。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接