Python:从multiprocessing.Process获取回溯信息

40

我正在尝试从multiprocessing.Process中获取一个traceback对象。不幸的是,通过管道传递异常信息并不起作用,因为无法pickle traceback对象:

def foo(pipe_to_parent):
    try:
        raise Exception('xxx')
    except:
        pipe_to_parent.send(sys.exc_info())

to_child, to_self = multiprocessing.Pipe()
process = multiprocessing.Process(target = foo, args = (to_self,))
process.start()
exc_info = to_child.recv()
process.join()
print traceback.format_exception(*exc_info)
to_child.close()
to_self.close()

跟踪:

Traceback (most recent call last):
  File "/usr/lib/python2.6/multiprocessing/process.py", line 231, in _bootstrap
    self.run()
  File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run
    self._target(*self._args, **self._kwargs)
  File "foo", line 7, in foo
    to_parent.send(sys.exc_info())
PicklingError: Can't pickle <type 'traceback'>: attribute lookup __builtin__.traceback failed

有没有其他方法可以访问异常信息?我想避免传递格式化的字符串。

6个回答

36

使用tblib,您可以传递包装后的异常并稍后重新引发它们:

import tblib.pickling_support
tblib.pickling_support.install()

from multiprocessing import Pool
import sys


class ExceptionWrapper(object):

    def __init__(self, ee):
        self.ee = ee
        __, __, self.tb = sys.exc_info()

    def re_raise(self):
        raise self.ee.with_traceback(self.tb)
        # for Python 2 replace the previous line by:
        # raise self.ee, None, self.tb


# example of how to use ExceptionWrapper

def inverse(i):
    """ will fail for i == 0 """
    try:
        return 1.0 / i
    except Exception as e:
        return ExceptionWrapper(e)


def main():
    p = Pool(1)
    results = p.map(inverse, [0, 1, 2, 3])
    for result in results:
        if isinstance(result, ExceptionWrapper):
            result.re_raise()


if __name__ == "__main__":
    main()

所以,如果在您的远程进程中捕获了一个异常,请使用ExceptionWrapper对其进行包装,然后将其传回。在主进程中调用re_raise()即可完成工作。


2
我不确定为什么这之前没有被点赞。它对我很有效!也许你应该包含一个如何使用 DelayedException 的示例。 - ForeverWintr
我猜我来晚了,或者人们没有看到我的创意;-) - rocksportrocker
这真是太巧妙了!我刚刚添加了一些修复以使其正常工作,并提供了一个示例。 - j08lue
1
我理解你的观点@rocksportrocker,我的编辑并没有严格符合OP所要求的。我将其作为一个单独的答案添加,仅供参考。 - j08lue
1
根据文档,tblib.pickling_support.install() 必须在 ExceptionWrapper 声明之后调用,或作为类定义的装饰器。https://github.com/ionelmc/python-tblib#pickling-tracebacks - Arseniy Banayev
1
根据使用情况,可以通过使用tblib的to_dict/from_dict来避免对pickle机制产生全局副作用:https://github.com/ionelmc/python-tblib#tblib-traceback-to-dict - ncoghlan

30

multiprocessing会打印子进程中抛出的异常字符串内容,因此您可以将所有子进程代码用try-except包装起来以捕获任何异常、格式化相关堆栈跟踪,并引发一个新的Exception,其中包含其所有相关信息的字符串:

下面是我在使用multiprocessing.map函数时所使用的一个示例函数:

def run_functor(functor):
    """
    Given a no-argument functor, run it and return its result. We can 
    use this with multiprocessing.map and map it over a list of job 
    functors to do them.

    Handles getting more than multiprocessing's pitiful exception output
    """

    try:
        # This is where you do your actual work
        return functor()
    except:
        # Put all exception text into an exception and raise that
        raise Exception("".join(traceback.format_exception(*sys.exc_info())))

你得到的是一个带有另一个格式化的堆栈跟踪作为错误信息的堆栈跟踪,这有助于调试。


1
OP明确表示他想避免传递格式化的字符串。 - Will Vousden

14

看起来让traceback对象可进行pickling似乎很困难。 但是你只能发送sys.exc_info()的前2个项以及使用traceback.extract_tb方法的预格式化的traceback信息:

import multiprocessing
import sys
import traceback

def foo(pipe_to_parent):
    try:
        raise Exception('xxx')
    except:
        except_type, except_class, tb = sys.exc_info()
        pipe_to_parent.send((except_type, except_class, traceback.extract_tb(tb)))

to_child, to_self = multiprocessing.Pipe()
process = multiprocessing.Process(target = foo, args = (to_self,))
process.start()
exc_info = to_child.recv()
process.join()
print exc_info
to_child.close()
to_self.close()

这将为您提供:

(<type 'exceptions.Exception'>, Exception('xxx',), [('test_tb.py', 7, 'foo', "raise Exception('xxx')")])

之后,您将能够获取有关异常原因的更多信息(引发异常的文件名、行号、方法名和引发异常的语句)。


1
我使用了完全相同的方法,但请注意,在Python3中,您可以在traceback中获取FrameSummary对象(而不是元组),因此这种方法将无法工作。 FrameSummary对象不是JSON可序列化的(可能是可picklable的)。 - smido

11

Python 3

在Python 3中,现在multiprocessing.pool.Async的get方法返回完整的回溯信息,请参见http://bugs.python.org/issue13831

Python 2

使用traceback.format_exc(即格式化异常)来获取回溯字符串。通过以下方式创建装饰器会更加方便:

def full_traceback(func):
    import traceback, functools
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            msg = "{}\n\nOriginal {}".format(e, traceback.format_exc())
            raise type(e)(msg)
    return wrapper

例子:

def func0():
    raise NameError("func0 exception")

def func1():
    return func0()

# Key is here!
@full_traceback
def main(i):
    return func1()

if __name__ == '__main__':
    from multiprocessing import Pool
    pool = Pool(4)
    try:
        results = pool.map_async(main, range(5)).get(1e5)
    finally:
        pool.close()
        pool.join()

使用装饰器的回溯:

Traceback (most recent call last):
  File "bt.py", line 34, in <module>
    results = pool.map_async(main, range(5)).get(1e5)
  File "/opt/anaconda/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
NameError: Exception in func0

Original Traceback (most recent call last):
  File "bt.py", line 13, in wrapper
    return func(*args, **kwargs)
  File "bt.py", line 27, in main
    return func1()
  File "bt.py", line 23, in func1
    return func0()
  File "bt.py", line 20, in func0
    raise NameError("Exception in func0")
NameError: Exception in func0

没有修饰符的回溯:

Traceback (most recent call last):
  File "bt.py", line 34, in <module>
    results = pool.map_async(main, range(5)).get(1e5)
  File "/opt/anaconda/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
NameError: Exception in func0

然而,由于包装函数的存在,回溯输出了两行额外的内容。我认为这是可以接受的,对吧? - Syrtis Major

4
这是对这个优秀的答案的改进。两者都依赖于tblib来存储追溯信息。

然而,与其要求返回异常对象(由OP提出),worker函数可以保持原样,并只需用try/except包装以存储异常以进行重新抛出。

import tblib.pickling_support
tblib.pickling_support.install()

import sys

class DelayedException(Exception):

    def __init__(self, ee):
        self.ee = ee
        __,  __, self.tb = sys.exc_info()
        super(DelayedException, self).__init__(str(ee))

    def re_raise(self):
        raise self.ee, None, self.tb

示例

def worker():
    try:
        raise ValueError('Something went wrong.')
    except Exception as e:
        raise DelayedException(e)


if __name__ == '__main__':

    import multiprocessing

    pool = multiprocessing.Pool()
    try:
        pool.imap(worker, [1, 2, 3])
    except DelayedException as e:
        e.re_raise()

0

@Syrtis Major@interfect相同的解决方案,但已在Python 3.6中进行了测试:

import sys
import traceback
import functools

def catch_remote_exceptions(wrapped_function):
    """ https://dev59.com/NW025IYBdhLWcg3wPzfL """

    @functools.wraps(wrapped_function)
    def new_function(*args, **kwargs):
        try:
            return wrapped_function(*args, **kwargs)

        except:
            raise Exception( "".join(traceback.format_exception(*sys.exc_info())) )

    return new_function

使用方法:

class ProcessLocker(object):
    @catch_remote_exceptions
    def __init__(self):
        super().__init__()

    @catch_remote_exceptions
    def create_process_locks(self, total_processes):
        self.process_locks = []
        # ...

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接