Python多进程卡死问题

5

以下是最小可重现代码:

import multiprocessing

class E(Exception):
    def __init__(self, a1, a2):
        Exception.__init__(self, '{}{}'.format(a1, a2))

def f(_):
    raise E(1, 2)

multiprocessing.Pool(1).map(f, (1,))

这会导致以下错误:
Exception in thread Thread-5:
Traceback (most recent call last):
  File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.7/threading.py", line 865, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.7/multiprocessing/pool.py", line 496, in _handle_results
    task = get()
  File "/usr/lib/python3.7/multiprocessing/contrnection.py", line 251, in recv
    return _ForkingPickler.loads(buf.getbuffer())
TypeError: __init__() missing 1 required positional argument: 'a2'

有没有办法解决这个问题?类似的问题在这里提到:https://bugs.python.org/issue39751

1
“get hung”是一个非常糟糕的描述。它并不是指进程挂起,而是会出现明确的错误信息导致失败。原始错误提供了更好的描述,同时也提供了原因!所以,简单的问题是,您是否尝试调整使得pickling工作? - Ulrich Eckhardt
1
如果您不引发异常并将其print,它就能正常工作。因此问题可能出现在引发异常的过程中。也许与与主线程通信有关。 - Curtwagner1984
@sadasd 是的,我明白。我只是想说,也许这会帮助其他人更好地找到问题所在。因为仅从异常信息中读取,对我来说并不明显问题本身就在于引发它。但也许应该是这样的。 - Curtwagner1984
@Curtwagner1984,即使在raise中,池工作进程目标函数也会捕获错误并尝试将其发送回主进程,但无法对E实例进行反序列化。 - Aaron
当我运行这个程序时,它实际上会挂起而不是返回一个错误。 - JeffUK
2个回答

1
我认为这与__reduce__函数返回的内容有关。它可能会返回类似于(self.__class__, self.args)的东西,基本上是说要重新创建此对象,请传递存储在self.args中的所有内容,该内容等于("12",)。
由于没有将一个字符串拆分回原始参数的方法,因此您可能不想在存储之前处理格式,并且应该只实现一个__str__方法。
class E(Exception):
    def __init__(self, a1, a2):
        super().__init__(a1, a2)

    def __str__(self):
        return f"{self.args[0]}{self.args[1]}"

编辑:我增加了其他可能性,但不建议使用。

# Change reduce
class E(Exception):
    def __init__(self, a1, a2):
        super().__init__(f"{a1}{a2}")

    def __reduce__(self, *args, **kwargs):
        return (self.__class__, (self.args[0], ""))

# Optional arg this one is similar to Booboo's answer.
class E(Exception):
    def __init__(self, a1, a2=""):
        # I am assuming you have a more complex format string when not using the toy example
        # If so you need an if here.
        if a2:
            super().__init__(f"{a1}{a2}")
        else:
            super().__init__(a1)
    

我认为你的分析可能是正确的。但是,在对象实例化时计算两个输入参数的函数与将异常转换为字符串(如果有)时计算它们之间可能存在巨大的语义差异。显然,在这种情况下不是这样,但你明白我的意思。 - Booboo
可能是这样,但由于BaseException的实现方式,你并没有真正的选择,你传递给super调用的参数会在重新创建Custom Exception时传递到它。你可以重写__reduce__,但这可能会更加灾难性。 - Nathan Buckner
看看我的回答。它距离你的并不遥远;只需要将你的目光移向页面下方即可。 - Booboo
我仍然认为了解目标很重要,如果目标是在异常中强制使用所需的参数并格式化消息,则您的解决方案无法实现。使用 *args 会丢失方法签名,如果传入4个参数会发生什么等情况。 - Nathan Buckner

0

我尝试通过定义pickle函数__getstate____getstate__来解决这个问题,但它们甚至没有被调用。然而,以下方法可以绕过这个问题,直到问题得到解决(我确信这是与reduction.ForkingPickler类有关的问题):

import multiprocessing

class E(Exception):
    def __init__(self, *args):
        if len(args) == 2:
            # Normal instantiation:
            Exception.__init__(self, '{}{}'.format(args[0], args[1]))
        else:
            # We are being pickled by the reduction.ForkingPickler:
            assert(len(args) == 1)
            Exception.__init__(self, args[0])

def f(_):
    raise E(1, 2)

if __name__ == '__main__': # Required for Windows
    try:
        multiprocessing.Pool(1).map(f, (1,))
    except E as e:
        print('Got E Exception:', e)

输出:

Got E Exception: 12

在我看来,似乎Exception.__new__直接调用了cls.__init__,这似乎违反了"在取消pickle化实例时不调用__init__()。"我不知道...(使用__reduce_ex__的结果进行调用) - Aaron
@Aaron 我认为这并不能解释为什么我的pickle函数甚至没有被调用。 - Booboo
不是说这种方法更好或更差,但在__init__中设置self.args = (a1, a2)对我来说更加简洁... self.args 几乎被所有异常类型使用。 - Aaron
@Aaron 这本质上是 Nathan Buckner 提出的解决方案。但请看我对他答案的评论。 - Booboo
我认为他的解决方案比我说的更好...在super().__init__之后直接再次设置self.args似乎更像是一种hack。话虽如此,我确实认为这更符合Exception子类化的意图。它保留了你给它的args,然后你可以自定义__str____repr__ - Aaron

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接