Python 3.4多进程在unittest中无法正常工作

3

我有一个使用多进程的单元测试。

从Python 3.2升级到Python 3.4后,我遇到了以下错误。我找不到任何提示,Python内部发生了什么变化以及我需要改变什么来使我的代码运行。

提前感谢。

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Python341_64\lib\multiprocessing\spawn.py", line 106, in spawn_main
    exitcode = _main(fd)
  File "C:\Python341_64\lib\multiprocessing\spawn.py", line 116, in _main
    self = pickle.load(from_parent)
EOFError: Ran out of input

Error
Traceback (most recent call last):
  File "D:\test_multiproc.py", line 46, in testSmallWorkflow
    p.start()
  File "C:\Python341_64\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\Python341_64\lib\multiprocessing\context.py", line 212, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Python341_64\lib\multiprocessing\context.py", line 313, in _Popen
    return Popen(process_obj)
  File "C:\Python341_64\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Python341_64\lib\multiprocessing\reduction.py", line 59, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot serialize '_io.TextIOWrapper' object

以下是一个示例代码,展示我如何重现这个错误:

import shutil
import traceback
import unittest
import time
from multiprocessing import Process
import os


class MyTest(unittest.TestCase):

    #---------------------------------------------------------------------------
    def setUp(self):
        self.working_dir = os.path.join(os.environ["TEMP"], "Testing")
        os.mkdir(self.working_dir)
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def tearDown(self):
        try:
            time.sleep(5)
            shutil.rmtree(self.working_dir, ignore_errors=True)
        except OSError as err:
            traceback.print_tb(err)
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def info(self, title):
        print(title)
        print('module name:', __name__)
        if hasattr(os, 'getppid'):  # only available on Unix
            print('parent process:', os.getppid())
        print('process id:', os.getpid())
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def f(self, name):
        self.info('function f')
        print('hello', name)
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def testSmallWorkflow(self):
        self.info('main line')
        p = Process(target=self.f, args=('bob',))
        p.start()
        p.join()
    #---------------------------------------------------------------------------
1个回答

7

问题是unittest.TestCase类本身不再可pickleable,而您必须pickle它才能pickle其绑定方法之一(self.f)。一个简单的解决方法是为您需要在子进程中调用的方法创建一个单独的类:

class Tester:
    def info(self, title=None):
        print("title {}".format(title))
        print('module name:', __name__)
        if hasattr(os, 'getppid'):  # only available on Unix
            print('parent process:', os.getppid())
        print('process id:', os.getpid())
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def f(self, name):
        self.info('function f')
        print('hello', name)
    #-------------------------------


class MyTest(unittest.TestCase):

    #---------------------------------------------------------------------------
    def setUp(self):
        self.working_dir = os.path.join(os.environ["TEMP"], "Testing")
        os.mkdir(self.working_dir)
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def tearDown(self):
        try:
            time.sleep(5)
            shutil.rmtree(self.working_dir, ignore_errors=True)
        except OSError as err:
            traceback.print_tb(err)
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def testSmallWorkflow(self):
        t = Tester()
        self.info('main line')
        p = Process(target=t.f, args=('bob',))
        p.start()
        p.join()

或者,您可以使用__setstate__/__getstate__来删除无法序列化的TestCase中的对象。在这种情况下,它是一个名为_Outcome的内部类。我们在子类中不关心它,所以我们可以从序列化状态中删除它:

class MyTest(unittest.TestCase):

    #---------------------------------------------------------------------------
    def setUp(self):
        self.working_dir = os.path.join(os.environ["TEMP"], "Testing")
        os.mkdir(self.working_dir)
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def tearDown(self):
        try:
            time.sleep(2)
            shutil.rmtree(self.working_dir, ignore_errors=True)
        except OSError as err:
            traceback.print_tb(err)
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def info(self, title=None):
        print("title {}".format(title))
        print('module name:', __name__)
        if hasattr(os, 'getppid'):  # only available on Unix
            print('parent process:', os.getppid())
        print('process id:', os.getpid())
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def f(self, name):
        self.info('function f')
        print('hello', name)
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def testSmallWorkflow(self):
        t = Tester()
        self.info('main line')
        p = Process(target=self.f, args=('bob',))
        p.start()
        p.join()

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['_outcome']
        return self_dict

    def __setstate(self, state):
        self.__dict__.update(self_dict)

非常感谢您提供如此有用和快速的答案。您说,unittest.Testcase已经不再可选。您能否为我提供更多关于此事的信息以及原因?也许有一些链接可以让我阅读相关内容吗? - knumskull
1
@knumskull 我不知道这是否有记录; 我只是通过查看代码找到了解决方法。问题在于 TestCase 内部使用的 _Outcome 对象包含一个 result 属性,该属性是一个 unittest.runner.TextTestResult 实例。这个类负责将每个测试的结果写入屏幕。它包含对 _io.TextIoWrapper 对象的引用,这些对象无法被 pickle。如果我这周找到一些时间,我可能会深入研究一下 3.2 和 3.4 之间发生了什么变化,并提供一个修补程序,使 TestCase 可以再次被 pickle。 - dano
感谢您的解释。它对我很有帮助。 - knumskull

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接