Sqlalchemy中的session.rollback在IntegrityError的情况下是否会导致queuepool耗尽处理程序?

3
我有以下表格:
class Feedback(Base):
  __tablename__ = 'feedbacks'
  __table_args__ = (UniqueConstraint('user_id', 'look_id'),)
  id = Column(Integer, primary_key=True)
  user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
  look_id = Column(Integer, ForeignKey('looks.id'), nullable=False)

我目前正在向这个表中插入大量条目,这将违反唯一约束条件。
我正在使用以下代码:
  for comment in session.query(Comment).filter(Comment.type == Comment.TYPE_LOOK).yield_per(100):
    feedback = Feedback()
    feedback.user_id = User.get_or_create(comment.model_id).id
    feedback.look_id = comment.commentable_id
    session.add(feedback)
    try:        # Refer to T20
      session.flush()
    except IntegrityError,e:
      print "IntegrityError", e
      session.rollback()
  session.commit()

"我遇到了以下错误:"
IntegrityError (IntegrityError) duplicate key value violates unique constraint "feedbacks_user_id_look_id_key"
DETAIL:  Key (user_id, look_id)=(140, 263008) already exists.
 'INSERT INTO feedbacks (user_id, look_id, score) VALUES (%(user_id)s, %(look_id)s, %(score)s) RETURNING feedbacks.id' {'user_id': 140, 'score': 1, 'look_id': 263008}
IntegrityError (IntegrityError) duplicate key value violates unique constraint "feedbacks_user_id_look_id_key"
...
(there's about 24 of these integrity errors here)
...
DETAIL:  Key (user_id, look_id)=(173, 263008) already exists.
 'INSERT INTO feedbacks (user_id, look_id, score) VALUES (%(user_id)s, %(look_id)s, %(score)s) RETURNING feedbacks.id' {'user_id': 173, 'score': 1, 'look_id': 263008}
No handlers could be found for logger "sqlalchemy.pool.QueuePool"
Traceback (most recent call last):
  File "load.py", line 40, in <module>
    load_crawl_data_into_feedback()
  File "load.py", line 21, in load_crawl_data_into_feedback
    for comment in session.query(Comment).filter(Comment.type == Comment.TYPE_LOOK).yield_per(100):
  File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2337, in instances
    fetch = cursor.fetchmany(self._yield_per)
  File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3230, in fetchmany
    self.cursor, self.context)
  File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3223, in fetchmany
    l = self.process_rows(self._fetchmany_impl(size))
  File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3343, in _fetchmany_impl
    row = self._fetchone_impl()
  File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3333, in _fetchone_impl
    self.__buffer_rows()
  File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3326, in __buffer_rows
    self.__rowbuffer = collections.deque(self.cursor.fetchmany(size))
sqlalchemy.exc.ProgrammingError: (ProgrammingError) named cursor isn't valid anymore None None

在你就yield_per导致错误下结论之前,我可以向你保证yield_per并不是罪魁祸首。

我尝试了相同的代码,但没有使用唯一约束,完全没有出现任何错误。

我认为完整性错误导致了No handlers could be found for logger "sqlalchemy.pool.QueuePool"

我假设每个完整性错误都会杀死队列池中的每个“线程”。

有人能够告诉我正在发生什么吗?

如果此时无法对数据做出太多改变,你会建议我怎么做?

2个回答

5
那个错误是来自Python的logging模块;你的池类正在尝试记录一些调试信息,但你没有配置SQLA日志记录。配置日志记录很容易,然后你就可以看到它实际上想说什么了。
我不太确定这里到底发生了什么,但你回滚顶层事务数十次肯定不会有所帮助。回滚会结束事务并使每个活动行对象无效。这肯定与yield_per不兼容。
如果你的数据库支持保存点或嵌套事务(即Postgres或Oracle...或者最近的MySQL?),尝试为每个尝试启动一个嵌套事务:
for comment in session.query(Comment).filter(Comment.type == Comment.TYPE_LOOK).yield_per(100):
    try:
        with session.begin_nested():
            feedback = Feedback()
            feedback.user_id = User.get_or_create(comment.model_id).id
            feedback.look_id = comment.commentable_id
            session.add(feedback)
            session.flush()
    except IntegrityError, e:
        print "IntegrityError", e

session.commit()

with 在出现错误时会回滚并在成功时提交,因此失败的flush不会对主事务的其余部分造成破坏。

如果您没有后端支持,其他明智的选择包括:

  • 复杂化您的查询:使用LEFT JOIN与反馈表连接,以便在应用程序中知道反馈行是否已存在。

  • 如果您愿意将(user_id, look_id)作为您的主键,我认为您可以使用session.merge(feedback)。 这类似于基于主键的插入或更新:如果SQLA可以找到具有相同pk的现有行,则会更新该行,否则它将在数据库中创建一个新行。 不过可能会为每个新行触发额外的SELECT


4
在你对yield_per这个错误下结论之前,我可以向你保证,yield_per不是罪魁祸首。
我不确定你为什么会这样想 - yield_per() 在这里非常重要,通过简单地尝试同样的测试没有使用yield_per()来查看行为是否不同,就可以迅速发现这一点。 通过使用yield_per(),psycopg2游标在循环继续时保持打开状态。 然后,你通过session.rollback()在psycopg2连接上发出一个ROLLBACK。 这正好会导致出现“命名游标不再有效”的错误。 实际上,之所以存在命名游标,是因为这是使用psycopg2进行服务器端游标的方法,这也是yield_per()的一部分。
"我尝试了没有唯一约束的相同代码,并且根本没有遇到任何错误。"
这是因为没有约束条件,不会抛出异常,也不会触发rollback()。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接