在concurrent.futures中获取异常的原始行号

26

使用concurrent.futures的示例(适用于2.7的后移版本):

import concurrent.futures  # line 01
def f(x):  # line 02
    return x * x  # line 03
data = [1, 2, 3, None, 5]  # line 04
with concurrent.futures.ThreadPoolExecutor(len(data)) as executor:  # line 05
    futures = [executor.submit(f, n) for n in data]  # line 06
    for future in futures:  # line 07
        print(future.result())  # line 08

输出:

1
4
9
Traceback (most recent call last):
  File "C:\test.py", line 8, in <module>
    print future.result()  # line 08
  File "C:\dev\Python27\lib\site-packages\futures-2.1.4-py2.7.egg\concurrent\futures\_base.py", line 397, in result
    return self.__get_result()
  File "C:\dev\Python27\lib\site-packages\futures-2.1.4-py2.7.egg\concurrent\futures\_base.py", line 356, in __get_result
    raise self._exception
TypeError: unsupported operand type(s) for *: 'NoneType' and 'NoneType'

"...\_base.py", line 356, in __get_result"这个字符串并不是我期望看到的端点。是否可能获取抛出异常的真实行数?例如:

  File "C:\test.py", line 3, in f
    return x * x  # line 03

Python3 在这种情况下似乎可以显示正确的行号,为什么 Python2.7 不能呢?是否有任何解决方法?


我也在寻找这个问题的答案。谢谢! - calmrat
3个回答

40
我曾经处于和你一样的情况,非常需要获得引发异常的回溯信息。我能够开发出以下ThreadPoolExecutor子类的解决方法。
import sys
import traceback
from concurrent.futures import ThreadPoolExecutor

class ThreadPoolExecutorStackTraced(ThreadPoolExecutor):

    def submit(self, fn, *args, **kwargs):
        """Submits the wrapped function instead of `fn`"""

        return super(ThreadPoolExecutorStackTraced, self).submit(
            self._function_wrapper, fn, *args, **kwargs)

    def _function_wrapper(self, fn, *args, **kwargs):
        """Wraps `fn` in order to preserve the traceback of any kind of
        raised exception

        """
        try:
            return fn(*args, **kwargs)
        except Exception:
            raise sys.exc_info()[0](traceback.format_exc())  # Creates an
                                                             # exception of the
                                                             # same type with the
                                                             # traceback as
                                                             # message

如果您使用这个子类并运行以下代码片段:
def f(x):
    return x * x

data = [1, 2, 3, None, 5]
with ThreadPoolExecutorStackTraced(max_workers=len(data)) as executor:
    futures = [executor.submit(f, n) for n in data]
    for future in futures:
        try:
            print future.result()
        except TypeError as e:
            print e

输出结果将类似于:

1
4
9
Traceback (most recent call last):
  File "future_traceback.py", line 17, in _function_wrapper
    return fn(*args, **kwargs)
  File "future_traceback.py", line 24, in f
    return x * x
TypeError: unsupported operand type(s) for *: 'NoneType' and 'NoneType'

25

问题出在futures库对sys.exc_info()的使用上。根据文档:
该函数返回一个由三个值组成的元组,这些值提供有关当前正在处理的异常的信息。[...] 如果堆栈上没有任何地方处理异常,则返回一个包含三个None值的元组。否则,返回的值是(type, value, traceback)。它们的含义是:type获取正在处理的异常的异常类型(一个类对象);value获取异常参数(其关联值或raise的第二个参数,如果异常类型是类对象,则始终为类实例);traceback获取一个封装了异常最初发生点处的调用堆栈的traceback对象。
现在,如果您查看futures的源代码,您就可以自己看到为什么会丢失traceback:当一个异常引发并且要设置到Future对象时,只传递sys.exc_info()[1]。请参见:

https://code.google.com/p/pythonfutures/source/browse/concurrent/futures/thread.py (L:63) https://code.google.com/p/pythonfutures/source/browse/concurrent/futures/_base.py (L:356)

因此,为了避免丢失回溯信息,您需要将其保存在某个地方。我的解决方法是将要提交的函数包装到一个包装器中,该包装器的唯一任务是捕获各种异常并引发相同类型的异常,其消息是回溯信息。通过这样做,当引发异常时,它被包装器捕获并重新引发,然后当sys.exc_info()[1]被分配给Future对象的异常时,回溯信息不会丢失。


2
我在 Stack Overflow 上看了很多答案,但只有这个对我有效 - 谢谢。即使使用其他更近期的方法,我也无法获取带有行号的堆栈跟踪。奇怪的是,这在并发 futures 中仍然是一个问题。 - Jacksporrow
优雅的面向对象编程解决方案,正确地使用了重写。谢谢! - Andrey
1
四年后,库中是否有任何更深入的东西?我认为使用concurrent.futures.ProcessPoolExecutor可能需要更多技巧,因为如果没有更改,“submit”方法将失败并显示“无法pickle 'weakref'对象”。 - matanster
好主意。我认为你可能想要捕获BaseException,而不仅仅是Exception - mike rodent
正如另一位评论者所指出的,在3.9上它会失败并显示“TypeError: cannot pickle '_thread.lock' object”。 - komodovaran_
显示剩余2条评论

15

我认为在ThreadPoolExecutor代码中,原始异常的追踪信息丢失了。它会将异常存储起来,然后稍后重新抛出。这里有一个解决方案。您可以使用traceback模块将您的函数f中的原始异常消息和追踪信息存储到一个字符串中。然后,使用包含f行号等内容的错误消息引发异常。运行f的代码可以被包装在try...except块中,捕获从ThreadPoolExecutor抛出的异常,并打印包含原始追溯信息的消息。

下面的代码对我有效。我认为这个解决方案有点笨拙,而且更愿意能够恢复原始的追踪信息,但我不确定是否可能。

import concurrent.futures
import sys,traceback


def f(x):
    try:
        return x * x
    except Exception, e:
        tracebackString = traceback.format_exc(e)
        raise StandardError, "\n\nError occurred. Original traceback is\n%s\n" %(tracebackString)



data = [1, 2, 3, None, 5]  # line 10

with concurrent.futures.ThreadPoolExecutor(len(data)) as executor:  # line 12
    try:
        futures = [executor.submit(f, n) for n in data]  # line 13
        for future in futures:  # line 14
           print(future.result())  # line 15
    except StandardError, e:
        print "\n"
        print e.message
        print "\n"

这是在Python2.7中的输出结果:

1
4
9




Error occurred. Original traceback is
Traceback (most recent call last):
File "thread.py", line 8, in f
   return x * x
TypeError: unsupported operand type(s) for *: 'NoneType' and 'NoneType'

当在Python 3中运行代码时,您的原始代码提供了正确的位置,而在2.7中则没有。原因是在Python 3中,异常会将回溯作为属性进行传递,并且在重新引发异常时,回溯会被扩展而不是替换。下面的示例说明了这一点:

def A():
    raise BaseException("Fish")

def B():
    try:
        A()
    except BaseException as e:
        raise e

B()

我在 Python 2.7Python 3.1 中运行了这个程序。在 2.7 版本中的输出结果如下:

Traceback (most recent call last):
  File "exceptions.py", line 11, in <module>
    B()
  File "exceptions.py", line 9, in B
    raise e
BaseException: Fish

即异常最初是从 A 抛出的这一事实在最终输出中没有记录。当我使用 Python 3.1 运行时,我得到以下结果:

Traceback (most recent call last):
  File "exceptions.py", line 11, in <module>
    B()
  File "exceptions.py", line 9, in B
    raise e
  File "exceptions.py", line 7, in B
    A()
  File "exceptions.py", line 3, in A
    raise BaseException("Fish")
BaseException: Fish

哪个更好?如果我在B中的except块中用raise代替raise e,那么python2.7将给出完整的回溯。我的猜测是,在将此模块向后移植到python2.7时,异常传播方面的差异被忽视了。


感谢您的详细解释,很抱歉答复晚了。 - djeendo

7
从第一个答案中得到启发,这里将其作为装饰器实现:
import functools
import traceback


def reraise_with_stack(func):

    @functools.wraps(func)
    def wrapped(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            traceback_str = traceback.format_exc(e)
            raise StandardError("Error occurred. Original traceback "
                                "is\n%s\n" % traceback_str)

    return wrapped

只需在执行的函数上应用装饰器:

@reraise_with_stack
def f():
    pass

1
StandardError已经在Python3中被移除。 - gies0r
这看起来很不错,但无法与ProcessPoolExecutor一起使用。 - Bklyn

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接