PostgreSQL的FOR UPDATE SKIP LOCKED仍然会选择重复的行。

10

我正在使用PostgreSQL作为作业队列。以下是我的查询语句,用于检索作业并更新其状态:

        UPDATE requests AS re
        SET
          started_at = NOW(),
          finished_at = NULL
        FROM (
          SELECT
            _re.*
          FROM requests AS _re
          WHERE
            _re.state = 'pending'
          AND
            _re.started_at IS NULL
          LIMIT 1
          FOR UPDATE SKIP LOCKED
        ) AS sub
        WHERE re.id = sub.id
        RETURNING
          sub.*
现在,我有几台机器,在每台机器上都有一个具有多个线程的进程,并且在每个线程上都有一个工作程序。同一进程中的所有工作程序共享一个连接池,通常有10-20个连接。
问题是,以上查询将返回一些重复的行!
我找不到任何原因。有人可以帮忙吗?
更详细地说,我正在使用Python3和psycopg2。
更新:
我尝试了@a_horse_with_no_name的答案,但似乎不起作用。
我注意到,一个请求通过两个查询检索,并更新了started_at:
2016-04-21 14:23:06.970897 + 08

2016-04-21 14:23:06.831345 + 08
只相差0.14秒。
我想知道在这两个连接执行内部SELECT子查询的时间,两个锁是否都没有建立?
更新:
更准确地说,我在1台机器上的1个进程中有200个工作程序(即200个线程)。

跨贴 http://dba.stackexchange.com/q/136030/1822 - user330315
在PostgreSQL 9.1中是否有FOR UPDATE SKIP LOCKED?我认为没有。 - e4c5
@e4c5,我正在使用9.5版本。 - HanXu
@e4c5,是的。我在一个进程中有200个工作线程,并且最多有20个连接。 - HanXu
我认为在这种情况下,Redis是最好的选择。它专门为此类事情量身定制,而不幸的是关系型数据库并不适合。 - e4c5
显示剩余4条评论
2个回答

5
请注意,如果不想让多个线程相互干扰,每个线程都需要有自己的连接。
如果您的应用程序使用多个执行线程,它们不能同时共享一个连接。您必须明确地控制连接的访问(使用互斥锁)或为每个线程使用一个连接。如果每个线程使用自己的连接,则需要使用 AT 子句来指定线程将使用哪个连接。
详见:http://www.postgresql.org/docs/9.5/static/ecpg-connect.html 如果两个线程共享同一连接,将会发生各种奇怪的事情。我相信这就是您遇到问题的原因。如果您使用一个连接进行加锁,那么所有使用相同连接的其他线程都将可以访问被锁住的对象。
我建议您采取一种简单的替代方案,即使用 redis 作为队列。您可以简单地使用 redis-py 和 lpush/rpop 方法,或者使用 python-rq。

谢谢!我稍后会尝试。只是一个快速的问题,因为我有200个工作人员,这意味着我必须建立200个连接。那么如果我有50台机器,总共就是10000个连接。PostgreSQL能够承载这么多的连接吗? - HanXu
答案已更新,提供了一种替代方法。 - e4c5
@HanXu:10000个连接肯定太多了。我甚至怀疑,除非你有一个真正、真正强大的数据库服务器,拥有许多核心(>100)和大量硬盘,否则即使有200个并发工作线程,事情也不会变得更快。你应该限制你的工作线程(在数据库中执行某些操作)的数量,以避免过载数据库。或者使用连接池,确保单个连接永远不会被两个并发线程使用。 - user330315

1

在进行选择操作时,存在锁定事务尚未发出的可能性,或者在选择结果准备好并开始更新语句时,锁定丢失。您是否尝试明确开始一个事务?

BEGIN;
  WITH req AS (
    SELECT id
    FROM requests AS _re
    WHERE _re.state = 'pending' AND _re.started_at IS NULL
    LIMIT 1 FOR UPDATE SKIP LOCKED
    )
  UPDATE requests SET started_at = NOW(), finished_at = NULL
  FROM req
  WHERE requests.id = req.id;
COMMIT;

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接