在使用SQLAlchemy和多进程时Python脚本挂起

7
考虑以下使用SQLAlchemy和Python多进程模块的Python脚本。这是在Debian squeeze上使用Python 2.6.6-8+b1(默认)和SQLAlchemy 0.6.3-3(默认)的简化版本实际代码。
import multiprocessing
from sqlalchemy import *
from sqlalchemy.orm import *
dbuser = ...
password = ...
dbname = ...
dbstring = "postgresql://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
db = create_engine(dbstring)
m = MetaData(db)

def make_foo(i):
    t1 = Table('foo%s'%i, m, Column('a', Integer, primary_key=True))

conn = db.connect()
for i in range(10):
    conn.execute("DROP TABLE IF EXISTS foo%s"%i)
conn.close()
db.dispose()

for i in range(10):
    make_foo(i)

m.create_all()

def do(kwargs):
    i, dbstring = kwargs['i'], kwargs['dbstring']

    db = create_engine(dbstring)
    Session = scoped_session(sessionmaker())
    Session.configure(bind=db)
    Session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;")
    Session.commit()
    db.dispose()

pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i, 'dbstring':dbstring})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously
r.get()
r.wait()
pool.close()
pool.join()

这个脚本出现以下错误信息后停止运行。
Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.6/multiprocessing/pool.py", line 259, in _handle_results
    task = get()
TypeError: ('__init__() takes at least 4 arguments (2 given)', <class 'sqlalchemy.exc.ProgrammingError'>, ('(ProgrammingError) syntax error at or near "%"\nLINE 1: COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;\n        ^\n',))

当然,这里的语法错误是TRUNCATE foo%s;。我的问题是,为什么进程会挂起,我能否说服它退出并带有错误,而不需要对我的代码进行大量手术?这种行为非常类似于我的实际代码。
请注意,如果将该语句替换为print foobarbaz之类的内容,则不会发生挂起。此外,如果我们替换

Session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;")
Session.commit()
db.dispose()

通过只使用Session.execute("TRUNCATE foo%s;")

我使用前一个版本,因为它更接近我的实际代码。

此外,从图片中删除multiprocessing并逐个循环表格可以消除挂起,并且只会出现错误。

我也有点困惑错误的形式,特别是TypeError: ('__init__() takes at least 4 arguments (2 given)'部分。这个错误来自哪里?似乎很可能是从multiprocessing代码中的某个地方引起的。

PostgreSQL 日志没有帮助。我看到很多像这样的行:

2012-01-09 14:16:34.174 IST [7810] 4f0aa96a.1e82/1 12/583 0 ERROR:  syntax error at or near "%" at character 28
2012-01-09 14:16:34.175 IST [7810] 4f0aa96a.1e82/2 12/583 0 STATEMENT:  COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;

但没有其他相关的内容。

更新1:感谢lbolla及其深入分析,我能够提交一个Python缺陷报告。请在此报告中查看sbt的分析,以及此处。还可以查看Python bug报告Fix exception pickling。因此,根据sbt的说明,我们可以使用以下代码复现原始错误:

import sqlalchemy.exc
e = sqlalchemy.exc.ProgrammingError("", {}, None)
type(e)(*e.args)

这提供了

Traceback (most recent call last):
  File "<stdin>", line 9, in <module>
TypeError: __init__() takes at least 4 arguments (2 given)

更新2:这个问题已经被Mike Bayer修复,至少对于SQLAlchemy来说是这样的,详情请参见错误报告StatementError Exceptions un-pickable.。根据Mike的建议,我也向psycopg2报告了一个类似的bug,虽然我没有(也没有)实际的故障示例。不过无论如何,他们显然已经修复了它,尽管他们没有提供任何修复的细节。请参见psycopg exceptions cannot be pickled。为了保险起见,我还报告了一个Python bug ConfigParser exceptions are not pickleable,对应的是lbolla提到的SO问题。看起来他们想要一个测试来解决这个问题。

无论如何,这看起来在可预见的未来将继续成为一个问题,因为总的来说,Python开发人员似乎没有意识到这个问题,所以不会防范它。令人惊讶的是,似乎没有足够的人使用多处理来使这成为一个众所周知的问题,或者他们只是忍受它。我希望Python开发人员至少能在Python 3中修复它,因为这很烦人。

我接受了lbolla的答案,因为如果没有他对问题与异常处理的相关说明,我可能无法理解这个问题。我还要感谢sbt,他解释了Python不能pickle异常是问题所在。我非常感激他们两个人,请投票支持他们的答案。谢谢。

更新3:我发布了一个后续问题:Catching unpickleable exceptions and re-raising
4个回答

11

我认为TypeError来自于multiprocessingget方法。

我从你的脚本中剥离了所有的数据库代码,看一下这个:

import multiprocessing
import sqlalchemy.exc

def do(kwargs):
    i = kwargs['i']
    print i
    raise sqlalchemy.exc.ProgrammingError("", {}, None)
    return i


pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously

# Use get or wait?
# r.get()
r.wait()

pool.close()
pool.join()
print results
使用r.wait返回预期结果,但使用r.get会引发TypeError。根据Python文档中的描述,在map_async之后使用r.wait

编辑:我必须修改我的先前答案。我现在相信TypeError来自SQLAlchemy。我已经修改了我的脚本以重现这个错误。

编辑2:看起来问题是multiprocessing.pool不允许任何工作进程引发构造函数需要参数的异常(也可以参见这里)。

我修改了我的脚本以强调这一点。

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

class GoodExc(Exception):
    def __init__(self, a=None):
        '''Optional param in the constructor.'''
        self.a = a

def do(kwargs):
    i = kwargs['i']
    print i
    raise BadExc('a')
    # raise GoodExc('a')
    return i

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

在您的情况下,鉴于您的代码引发了SQLAlchemy异常,我能想到的唯一解决方案是捕获do函数中的所有异常并重新引发一个普通的Exception。类似于这样:

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

def do(kwargs):
    try:
        i = kwargs['i']
        print i
        raise BadExc('a')
        return i
    except Exception as e:
        raise Exception(repr(e))

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

编辑3:看起来这是一个Python的bug,但在SQLAlchemy中使用适当的异常处理可以解决它:因此,我也向SQLAlchemy提出了这个问题

作为解决方法,我认为编辑2末尾的方案可行(在回调函数中加入try-except并重新引发异常)。


仅使用get的回溯是File "<stdin>", line 17, in <module> File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get raise self._value Exception: foo 1 7 8 9 0 2 3 4 5 6 - Faheem Mitha
使用您修改后的脚本后,我确实遇到了非常类似的错误,即0 1 2 3 4 5 6 7 8 9 线程“Thread-2”中的异常:Traceback(最新调用) last):File“/usr/lib/python2.6/threading.py”,line 532,在__bootstrap_inner中 self.run() File“/usr/lib/python2.6/threading.py”,line 484,在run中self.__target(*self.__args,**self.__kwargs) File“/usr/lib/python2.6/multiprocessing/pool.py”,line 259,in_handle_results task = get() TypeError:('__init __()需要至少4个参数(给出2个)',<class 'sqlalchemy.exc.ProgrammingError'>,('(NoneType)None',))。 - Faheem Mitha
同样的问题出现在3.1版本中,尽管回溯信息稍有不同。通过您的GoodExc/BadExc我得到了… - Faheem Mitha
我已编辑了我的答案以反映这一点,并提出了SQLAlchemy的问题。如果您对答案满意,请批准它,以便我们可以关闭这个线程。 - lbolla
SQLA的错误已经修复,请查看我的更新。再次感谢您对此的帮助。 - Faheem Mitha
显示剩余11条评论

2

TypeError: ('__init__() takes at least 4 arguments (2 given)错误与您尝试执行的sql无关,而是与您如何使用SqlAlchemy的API有关。

问题在于您尝试在会话类上调用execute,而不是该会话的实例。

请尝试以下操作:

session = Session()
session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;")
session.commit()

根据文档

sessionmaker() 函数旨在在应用程序的全局范围内调用,并将返回的类作为实例化会话的唯一类提供给应用程序的其余部分。

因此,Session = sessionmaker() 返回一个新的会话类,session = Session() 返回该类的一个实例,您可以在该实例上调用 execute


嗨,Stephen,我正在使用的习语来自例如创建线程本地上下文(0.6版本)。我相信这是在您使用多个线程时使用的。由于我正在使用多个进程,所以我想我不需要使用它-它是从先前使用线程的版本中留下的,但我在保持线程分离方面遇到了一些问题。无论如何,我相信这与我看到的问题无关。 - Faheem Mitha
为了确保,我已经用你提到的传统语法替换了scoped_session语法,但是并没有任何区别。 - Faheem Mitha

1

我不知道原始异常的原因。然而,multiprocessing 对于“不良”异常的问题实际上是由 pickling 的工作方式引起的。我认为 sqlachemy 异常类有缺陷。

如果一个异常类有一个未调用 BaseException.__init__() 的 __init__() 方法(直接或间接地),那么 self.args 可能不会被正确设置。BaseException.__reduce__()(pickle 协议使用)假定一个异常 e 的副本可以通过仅执行下面的操作来重新创建:

type(e)(*e.args)

例如。
>>> e = ValueError("bad value")
>>> e
ValueError('bad value',)
>>> type(e)(*e.args)
ValueError('bad value',)

如果这个不变量不成立,那么序列化/反序列化将会失败。因此,实例
class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

可以被腌制,但结果不能被解腌:

>>> from cPickle import loads, dumps
>>> class BadExc(Exception):
...     def __init__(self, a):
...         '''Non-optional param in the constructor.'''
...         self.a = a
...
>>> loads(dumps(BadExc(1)))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: ('__init__() takes exactly 2 arguments (1 given)', <class '__main__.BadExc'>, ())

但是实例

class GoodExc1(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        Exception.__init__(self, a)
        self.a = a

或者

class GoodExc2(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.args = (a,)
        self.a = a

可以成功地进行序列化/反序列化。

因此,您应该要求SQLAlchemy的开发人员修复他们的异常类。同时,您可能可以使用copy_reg.pickle()来覆盖有问题的类的BaseException.__reduce__()方法。


非常感谢您的解释。您能否说明在这种情况下如何使用copy_reg.pickle()来覆盖BaseException.__reduce__()?谢谢。 - Faheem Mitha
这里是SQLAlchemy的异常类。有没有一种方法可以一次性覆盖所有类? - Faheem Mitha

1

(这是回答 Faheem Mitha 在评论中如何使用 copy_reg 来解决损坏的异常类问题的答案。)

SQLAlchemy 异常类的 __init__() 方法似乎调用了它们基类的 __init__() 方法,但参数不同。这会搞乱 pickling。

要自定义 sqlalchemy 异常类的 pickling,您可以使用 copy_reg 注册自己的 reduce 函数来处理这些类。

reduce 函数接受一个参数 obj,并返回一对 (callable_obj, args),以便通过执行 callable_obj(*args) 创建 obj 的副本。例如:

class StatementError(SQLAlchemyError):
    def __init__(self, message, statement, params, orig):
        SQLAlchemyError.__init__(self, message)
        self.statement = statement
        self.params = params
        self.orig = orig
    ...

可以通过以下方式“修复”

import copy_reg, sqlalchemy.exc

def reduce_StatementError(e):
    message = e.args[0]
    args = (message, e.statement, e.params, e.orig)
    return (type(e), args)

copy_reg.pickle(sqlalchemy.exc.StatementError, reduce_StatementError)

sqlalchemy.exc中还有几个其他的类需要类似地修复。但希望你能理解这个想法。


仔细想想,与其逐个修复每个类,你可能只需要猴子补丁基本异常类的__reduce__()方法:

import sqlalchemy.exc

def rebuild_exc(cls, args, dic):
    e = Exception.__new__(cls)
    e.args = args
    e.__dict__.update(dic)
    return e

def __reduce__(e):
    return (rebuild_exc, (type(e), e.args, e.__dict__))

sqlalchemy.exc.SQLAlchemyError.__reduce__ = __reduce__

嗨sbt,非常感谢您的回答,它非常有帮助。正如我在更新中提到的那样,Mike Bayer现在已经修复了SQLAlchemy的问题,尽管它可能会继续成为其他库的问题。不过我想知道,为什么您只有11个声望,而有2个赞同?这应该是21,对吧?关于处理错误,我倾向于尝试捕获并重新引发从多进程作业内部抛出的异常,就像lbolla建议的那样,因为即使我修复了SQLA异常,我也无法确定是否会抛出来自SQLA之外的异常。 - Faheem Mitha

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接